aster.public

Aster RPC -- Public API Reference

This module re-exports the public API surface for documentation. Import from aster directly in your code, not from aster.public.

Getting Started

Producer (server)::

from aster import AsterServer, service, rpc, wire_type
from dataclasses import dataclass

@wire_type("myapp/GreetRequest")
@dataclass
class GreetRequest:
    name: str = ""

@wire_type("myapp/GreetResponse")
@dataclass
class GreetResponse:
    message: str = ""

@service(name="Greeter", version=1)
class Greeter:
    @rpc
    async def greet(self, req: GreetRequest) -> GreetResponse:
        return GreetResponse(message=f"Hello, {req.name}!")

async with AsterServer(services=[Greeter()]) as srv:
    print(srv.address)  # share this with consumers
    await srv.serve()

Consumer (client)::

from aster import AsterClient

client = AsterClient(address="aster1...")
await client.connect()

# Use the shell for exploration:
#   aster shell aster1...

# Or generate a typed client:
#   aster contract gen-client aster1... --out ./clients --lang python --package myapp

Decorators

Use these to define RPC services:

  • service() -- Declare a class as an RPC service
  • rpc() -- Mark a method as a unary RPC endpoint
  • server_stream() -- Mark a method as server-streaming
  • client_stream() -- Mark a method as client-streaming
  • bidi_stream() -- Mark a method as bidirectional streaming
  • wire_type() -- Register a dataclass for cross-language serialization

Authorization

Compose capability requirements:

  • any_of() -- caller must have at least one of the listed capabilities
  • all_of() -- caller must have every listed capability

Serialization

  • SerializationMode -- pick between XLANG (cross-language Fory), NATIVE (single-language Fory), ROW (zero-copy Fory rows for data-heavy workloads), and JSON (human-readable wire format for debugging and the dynamic proxy client).

Error Handling

All RPC failures raise RpcError with a StatusCode. Specific subclasses give actionable diagnostics:

  • AdmissionDeniedError -- raised when the server refuses consumer admission (open-gate vs trusted-mode mismatch, expired credential, endpoint id mismatch). The error message enumerates the common causes.
  • ContractViolationError -- raised when the strict-mode codec rejects a payload whose shape doesn't match the contract.

Interceptors

Built-in middleware that wraps every RPC call. Configure them on the AsterServer or AsterClient constructor.

  1"""
  2Aster RPC -- Public API Reference
  3=================================
  4
  5This module re-exports the public API surface for documentation.
  6Import from ``aster`` directly in your code, not from ``aster.public``.
  7
  8Getting Started
  9---------------
 10
 11**Producer** (server)::
 12
 13    from aster import AsterServer, service, rpc, wire_type
 14    from dataclasses import dataclass
 15
 16    @wire_type("myapp/GreetRequest")
 17    @dataclass
 18    class GreetRequest:
 19        name: str = ""
 20
 21    @wire_type("myapp/GreetResponse")
 22    @dataclass
 23    class GreetResponse:
 24        message: str = ""
 25
 26    @service(name="Greeter", version=1)
 27    class Greeter:
 28        @rpc
 29        async def greet(self, req: GreetRequest) -> GreetResponse:
 30            return GreetResponse(message=f"Hello, {req.name}!")
 31
 32    async with AsterServer(services=[Greeter()]) as srv:
 33        print(srv.address)  # share this with consumers
 34        await srv.serve()
 35
 36**Consumer** (client)::
 37
 38    from aster import AsterClient
 39
 40    client = AsterClient(address="aster1...")
 41    await client.connect()
 42
 43    # Use the shell for exploration:
 44    #   aster shell aster1...
 45
 46    # Or generate a typed client:
 47    #   aster contract gen-client aster1... --out ./clients --lang python --package myapp
 48
 49Decorators
 50----------
 51
 52Use these to define RPC services:
 53
 54- :func:`service` -- Declare a class as an RPC service
 55- :func:`rpc` -- Mark a method as a unary RPC endpoint
 56- :func:`server_stream` -- Mark a method as server-streaming
 57- :func:`client_stream` -- Mark a method as client-streaming
 58- :func:`bidi_stream` -- Mark a method as bidirectional streaming
 59- :func:`wire_type` -- Register a dataclass for cross-language serialization
 60
 61Authorization
 62-------------
 63
 64Compose capability requirements:
 65
 66- :func:`any_of` -- caller must have at least one of the listed capabilities
 67- :func:`all_of` -- caller must have every listed capability
 68
 69Serialization
 70-------------
 71
 72- :class:`SerializationMode` -- pick between XLANG (cross-language Fory),
 73  NATIVE (single-language Fory), ROW (zero-copy Fory rows for data-heavy
 74  workloads), and JSON (human-readable wire format for debugging and the
 75  dynamic proxy client).
 76
 77Error Handling
 78--------------
 79
 80All RPC failures raise :class:`RpcError` with a :class:`StatusCode`.
 81Specific subclasses give actionable diagnostics:
 82
 83- :class:`AdmissionDeniedError` -- raised when the server refuses
 84  consumer admission (open-gate vs trusted-mode mismatch, expired
 85  credential, endpoint id mismatch). The error message enumerates the
 86  common causes.
 87- :class:`ContractViolationError` -- raised when the strict-mode codec
 88  rejects a payload whose shape doesn't match the contract.
 89
 90Interceptors
 91------------
 92
 93Built-in middleware that wraps every RPC call. Configure them on the
 94``AsterServer`` or ``AsterClient`` constructor.
 95
 96- :class:`CallContext` -- per-call context object passed through the chain
 97- :class:`Interceptor` -- base class for custom interceptors
 98- :class:`DeadlineInterceptor` -- enforce per-call deadlines
 99- :class:`AuthInterceptor` -- token-based authentication
100- :class:`RetryInterceptor` -- automatic retry for idempotent methods
101- :class:`CircuitBreakerInterceptor` -- circuit breaker for failing endpoints
102- :class:`AuditLogInterceptor` -- log every RPC call for audit
103- :class:`MetricsInterceptor` -- collect call latency and error metrics
104"""
105
106from aster.runtime import AsterServer, AsterClient, AdmissionDeniedError
107from aster.decorators import service, rpc, server_stream, client_stream, bidi_stream
108from aster.codec import wire_type
109from aster.status import RpcError, StatusCode, ContractViolationError
110from aster.rpc_types import SerializationMode
111from aster.config import AsterConfig
112from aster.capabilities import any_of, all_of
113from aster.interceptors import (
114    CallContext,
115    Interceptor,
116    DeadlineInterceptor,
117    AuthInterceptor,
118    RetryInterceptor,
119    CircuitBreakerInterceptor,
120    AuditLogInterceptor,
121    MetricsInterceptor,
122)
123
124
125__all__ = [
126    # Server / client
127    "AsterServer",
128    "AsterClient",
129    "AsterConfig",
130    # Decorators
131    "service",
132    "rpc",
133    "server_stream",
134    "client_stream",
135    "bidi_stream",
136    "wire_type",
137    # Authorization
138    "any_of",
139    "all_of",
140    # Serialization
141    "SerializationMode",
142    # Errors
143    "RpcError",
144    "StatusCode",
145    "AdmissionDeniedError",
146    "ContractViolationError",
147    # Interceptors
148    "CallContext",
149    "Interceptor",
150    "DeadlineInterceptor",
151    "AuthInterceptor",
152    "RetryInterceptor",
153    "CircuitBreakerInterceptor",
154    "AuditLogInterceptor",
155    "MetricsInterceptor",
156]
class AsterServer:
 150class AsterServer:
 151    """High-level, declarative producer.
 152
 153    Builds a single :class:`IrohNode` that serves blobs + docs + gossip
 154    (iroh built-in protocols) alongside aster RPC (``aster/1``) and any
 155    enabled admission ALPNs -- all on **one endpoint, one node ID**.
 156
 157    When any admission gate is active (``allow_all_consumers=False`` or
 158    ``allow_all_producers=False``), the node is built with
 159    ``enable_hooks=True`` and a background task runs the Gate 0
 160    connection-level hook loop (``MeshEndpointHook.run_hook_loop``), which
 161    gates *all* protocols (blobs, docs, gossip, aster/1, admission) at the
 162    QUIC handshake layer.
 163    """
 164
 165    def __init__(
 166        self,
 167        services: list,
 168        *,
 169        config: "AsterConfig | None" = None,
 170        peer: str | None = None,
 171        identity: str | None = None,
 172        # Inline overrides (take priority over config):
 173        root_pubkey: bytes | None = None,
 174        allow_all_consumers: bool | None = None,
 175        allow_all_producers: bool | None = None,
 176        endpoint_config: EndpointConfig | None = None,
 177        # Internal wiring:
 178        channel_name: str = "rpc",
 179        codec: Any | None = None,
 180        interceptors: list[Any] | None = None,
 181        hook: MeshEndpointHook | None = None,
 182        nonce_store: Any | None = None,
 183        registry_namespace: str = "",
 184        mesh_state: MeshState | None = None,
 185        clock_drift_config: ClockDriftConfig | None = None,
 186        persist_mesh_state: bool = False,
 187    ) -> None:
 188        """Create an Aster RPC server.
 189
 190        .. note:: **Interceptors are not wired by default.** The server ships
 191           with interceptors for rate limiting, deadline enforcement, auth,
 192           capability checks, circuit breaking, metrics, audit logging, and
 193           retry hints -- but none are active unless you pass them via the
 194           ``interceptors`` parameter. For production use, wire at minimum
 195           ``DeadlineInterceptor`` and ``RateLimitInterceptor``. All
 196           interceptors live in ``aster.interceptors``.
 197
 198        Args:
 199            services: List of ``@service``-decorated class instances to serve.
 200                At least one is required.
 201            config: Optional :class:`AsterConfig` for trust, storage, and
 202                networking settings. If omitted, settings are loaded from
 203                environment variables and defaults.
 204            peer: Optional peer name for this server (used in config lookup
 205                and identity file resolution).
 206            identity: Path to ``.aster-identity`` file (default: auto-detected
 207                from CWD). Overrides ``config.identity_file``.
 208            root_pubkey: 32-byte ed25519 public key for the trust anchor.
 209                Overrides ``config.root_pubkey`` if both are set.
 210            allow_all_consumers: If ``True``, skip consumer admission
 211                (open gate). Overrides ``config.allow_all_consumers``.
 212            allow_all_producers: If ``True``, skip producer admission.
 213                Overrides ``config.allow_all_producers``.
 214            endpoint_config: Low-level iroh endpoint configuration.
 215
 216        Example::
 217
 218            @service(name="MyService", version=1)
 219            class MyService:
 220                @rpc()
 221                async def hello(self, req: HelloRequest) -> HelloResponse:
 222                    return HelloResponse(message=f"Hello {req.name}")
 223
 224            async with AsterServer(services=[MyService()]) as srv:
 225                print(srv.address)
 226                await srv.serve()
 227        """
 228        if not services:
 229            raise ValueError("AsterServer requires at least one service")
 230
 231        # Auto-load config from env if none provided.
 232        from .config import AsterConfig
 233        if config is None:
 234            config = AsterConfig.from_env()
 235        if identity is not None:
 236            config.identity_file = identity
 237        self._config = config
 238
 239        # Inline overrides win over config.
 240        self._allow_all_consumers = (
 241            allow_all_consumers if allow_all_consumers is not None
 242            else config.allow_all_consumers
 243        )
 244        self._allow_all_producers = (
 245            allow_all_producers if allow_all_producers is not None
 246            else config.allow_all_producers
 247        )
 248
 249        # Load identity file if present (.aster-identity).
 250        self._peer_name = peer
 251        secret_key_from_identity, peer_entry = config.load_identity(
 252            peer_name=peer, role="producer"
 253        )
 254        if peer_entry and not root_pubkey:
 255            # Root pubkey comes from the credential in the identity file.
 256            root_pubkey = bytes.fromhex(peer_entry["root_pubkey"])
 257        if secret_key_from_identity and not config.secret_key:
 258            import base64 as _b64
 259            config.secret_key = secret_key_from_identity
 260
 261        # Resolve root public key: inline > identity file > config file > ephemeral.
 262        # The root private key is NEVER on a running node (trust spec §1.1).
 263        pub = config.resolve_root_pubkey()
 264        self._root_pubkey = root_pubkey if root_pubkey is not None else pub
 265
 266        # Dev mode: if using an ephemeral root key (no explicit pubkey file),
 267        # auto-open the consumer gate so the quickstart works without
 268        # credential files. In production (explicit root_pubkey_file),
 269        # the default allow_all_consumers=False requires credentials.
 270        if (
 271            config._ephemeral_privkey is not None
 272            and allow_all_consumers is None
 273            and config.root_pubkey_file is None
 274        ):
 275            self._allow_all_consumers = True
 276            logger.info(
 277                "Dev mode: allow_all_consumers=True (ephemeral root key). "
 278                "Set ASTER_ROOT_PUBKEY_FILE for production admission."
 279            )
 280
 281        if (not self._allow_all_consumers or not self._allow_all_producers) and self._root_pubkey is None:
 282            raise ValueError(
 283                "root_pubkey is required when admission is enabled "
 284                "(allow_all_consumers=False or allow_all_producers=False). "
 285                "Set ASTER_ROOT_PUBKEY_FILE or pass root_pubkey= explicitly."
 286            )
 287
 288        self._services_in: list = list(services)
 289        self._endpoint_config_template = endpoint_config or config.to_endpoint_config()
 290        self._channel_name = channel_name
 291        self._codec = codec
 292        from aster.interceptors.deadline import DeadlineInterceptor
 293        if interceptors is not None:
 294            self._interceptors = list(interceptors)
 295        else:
 296            self._interceptors = [DeadlineInterceptor()]
 297        self._hook = hook
 298        self._nonce_store = nonce_store
 299
 300        # Admission → dispatch bridge: stores per-peer attributes
 301        from aster.peer_store import PeerAttributeStore
 302        self._peer_store = PeerAttributeStore()
 303        self._registry_namespace = registry_namespace
 304        self._mesh_state = mesh_state
 305        self._clock_drift_config = clock_drift_config
 306        self._persist_mesh_state = persist_mesh_state
 307
 308        # Populated by start()
 309        self._started: bool = False
 310        self._node: IrohNode | None = None
 311        self._service_summaries: list[ServiceSummary] = []
 312        self._server: Server | None = None
 313        # Lazy caches for .blobs / .docs / .gossip
 314        self._blobs: Any | None = None
 315        self._docs: Any | None = None
 316        self._gossip: Any | None = None
 317
 318        # Populated by serve()
 319        self._serve_task: asyncio.Task | None = None
 320        self._subtasks: list[asyncio.Task] = []
 321        self._closed: bool = False
 322
 323        # Producer service tokens for @aster endpoint registration.
 324        self._producer_tokens: dict[str, dict] = {}  # service_name -> token dict
 325        self._load_producer_tokens()
 326
 327        # Delegation policies for aster.admission ALPN.
 328        # Built from published_services entries that have aster_root_pubkey.
 329        self._delegation_policies: dict[str, Any] = {}  # service_name -> policy
 330        self._load_delegation_policies()
 331
 332    # ── Lifecycle ────────────────────────────────────────────────────────────
 333
 334    async def start(self) -> None:
 335        """Create the unified node and compute ``ServiceSummary`` list. Idempotent."""
 336        if self._started:
 337            return
 338
 339        # Configure structured logging from config (idempotent)
 340        from aster.logging import configure_logging
 341        configure_logging(
 342            format=self._config.log_format,
 343            level=self._config.log_level,
 344            mask=self._config.log_mask,
 345        )
 346
 347        # Determine which aster ALPNs to register on the Router.
 348        # Consumer admission is ALWAYS registered -- even in open-gate mode
 349        # the consumer uses it to discover services.
 350        aster_alpns: list[bytes] = [RPC_ALPN, ALPN_CONSUMER_ADMISSION]
 351        gate0_needed = False
 352        if not self._allow_all_consumers:
 353            if self._hook is None:
 354                self._hook = MeshEndpointHook(peer_store=self._peer_store)
 355            if self._nonce_store is None:
 356                self._nonce_store = InMemoryNonceStore()
 357            gate0_needed = True
 358        if not self._allow_all_producers:
 359            aster_alpns.append(ALPN_PRODUCER_ADMISSION)
 360            if self._hook is None:
 361                self._hook = MeshEndpointHook(peer_store=self._peer_store)
 362            gate0_needed = True
 363
 364        # Build EndpointConfig so hooks (Gate 0) are installed when needed.
 365        ep_cfg = _build_node_endpoint_config(
 366            self._endpoint_config_template, enable_hooks=gate0_needed
 367        )
 368
 369        self._node = await IrohNode.memory_with_alpns(aster_alpns, ep_cfg)
 370        addr_b64 = base64.b64encode(
 371            self._node.node_addr_info().to_bytes()
 372        ).decode()
 373
 374        # Auto-create ephemeral MeshState. Even when allow_all_producers=True
 375        # we need the topic_id so the root node's shell can observe gossip.
 376        if self._mesh_state is None and self._root_pubkey is not None:
 377            self._mesh_state = make_ephemeral_mesh_state(self._root_pubkey)
 378
 379        # Build ServiceSummary list with per-spec contract_id.
 380        summaries: list[ServiceSummary] = []
 381        for svc in self._services_in:
 382            svc_cls = svc if inspect.isclass(svc) else type(svc)
 383            info = getattr(svc_cls, "__aster_service_info__", None)
 384            if info is None:
 385                raise TypeError(
 386                    f"{svc_cls!r} is not @service-decorated "
 387                    f"(missing __aster_service_info__)"
 388                )
 389            cid = contract_id_from_service(svc_cls)
 390            summaries.append(
 391                ServiceSummary(
 392                    name=info.name,
 393                    version=info.version,
 394                    contract_id=cid,
 395                    channels={self._channel_name: addr_b64},
 396                    pattern=info.scoped or "shared",
 397                )
 398            )
 399        self._service_summaries = summaries
 400
 401        # ── C11.4.3: Startup publication verification ────────────────────
 402        # If .aster/manifest.json exists, verify that each service's live
 403        # contract_id matches the committed manifest. Fatal on mismatch.
 404        manifest_path = os.path.join(os.getcwd(), ".aster", "manifest.json")
 405        if os.path.isfile(manifest_path):
 406            import json as _json
 407            from .contract.manifest import ContractManifest
 408
 409            try:
 410                with open(manifest_path, encoding="utf-8") as f:
 411                    manifest_data = _json.load(f)
 412            except Exception as exc:
 413                raise RuntimeError(
 414                    f"Failed to read manifest at {manifest_path}: {exc}"
 415                ) from exc
 416
 417            # Support both single manifest and list of manifests
 418            manifests_list = manifest_data if isinstance(manifest_data, list) else [manifest_data]
 419            manifest_by_key: dict[tuple[str, int], ContractManifest] = {}
 420            for md in manifests_list:
 421                m = ContractManifest(**md)
 422                manifest_by_key[(m.service, m.version)] = m
 423
 424            for summary in summaries:
 425                key = (summary.name, summary.version)
 426                manifest = manifest_by_key.get(key)
 427                if manifest is None:
 428                    continue  # Service not in manifest -- skip
 429                if summary.contract_id != manifest.contract_id:
 430                    raise RuntimeError(
 431                        f"Contract identity mismatch for {summary.name!r} "
 432                        f"v{summary.version}:\n"
 433                        f"  Expected (manifest): {manifest.contract_id}\n"
 434                        f"  Actual (live):       {summary.contract_id}\n"
 435                        f"  Manifest: {manifest_path}\n"
 436                        f"  -> The service interface has changed without "
 437                        f"updating the manifest.\n"
 438                        f"     Rerun `aster contract gen` and commit the "
 439                        f"updated manifest."
 440                    )
 441            logger.debug("Manifest verification passed for %d service(s)", len(manifest_by_key))
 442
 443        # ── Contract publication to registry doc ─────────────────────────
 444        # Create a registry doc, publish contract collections (with manifest
 445        # including method schemas), and generate a read-only share ticket
 446        # so consumers can discover full contract metadata.
 447        await self._publish_contracts()
 448
 449        # ── Gate 3: capability interceptor & default-deny warning ────────
 450        #
 451        # Build a service_name -> ServiceInfo map for the CapabilityInterceptor
 452        # and check whether any service has authorization configured.
 453        from .interceptors.capability import CapabilityInterceptor
 454
 455        svc_map: dict[str, Any] = {}
 456        any_has_requires = False
 457        for svc in self._services_in:
 458            svc_cls = svc if inspect.isclass(svc) else type(svc)
 459            info = getattr(svc_cls, "__aster_service_info__", None)
 460            if info is not None:
 461                svc_map[info.name] = info
 462                if info.requires is not None:
 463                    any_has_requires = True
 464                for mi in info.methods.values():
 465                    if mi.requires is not None:
 466                        any_has_requires = True
 467
 468        # Auto-wire the CapabilityInterceptor when trust is configured
 469        # (Gate 0 enabled) or when any service declares requires.
 470        has_auth_interceptor = any(
 471            isinstance(i, CapabilityInterceptor) for i in self._interceptors
 472        )
 473        if (not self._allow_all_consumers or any_has_requires) and not has_auth_interceptor:
 474            # Prepend so capability is checked before application interceptors.
 475            self._interceptors.insert(0, CapabilityInterceptor(svc_map))
 476
 477        # S12.2: default-deny startup warning.
 478        # When Gate 0 is disabled (allow_all_consumers=True), any service
 479        # without explicit authorization is wide open.  Emit a warning so
 480        # the developer knows.
 481        if self._allow_all_consumers:
 482            for svc in self._services_in:
 483                svc_cls = svc if inspect.isclass(svc) else type(svc)
 484                info = getattr(svc_cls, "__aster_service_info__", None)
 485                if info is None:
 486                    continue
 487                if info.public:
 488                    continue
 489                svc_has_auth = info.requires is not None
 490                if not svc_has_auth:
 491                    svc_has_auth = any(
 492                        mi.requires is not None for mi in info.methods.values()
 493                    )
 494                if not svc_has_auth:
 495                    # Also check if user explicitly added an auth interceptor.
 496                    from .interceptors.auth import AuthInterceptor
 497                    has_explicit_auth = any(
 498                        isinstance(i, AuthInterceptor) for i in self._interceptors
 499                    )
 500                    if not has_explicit_auth:
 501                        warnings.warn(
 502                            f"Service '{info.name}' has no authorization configured "
 503                            f"and Gate 0 is disabled (allow_all_consumers=True). "
 504                            f"All consumers can call this service without "
 505                            f"authentication. Add @service(requires=...) or "
 506                            f"configure an auth interceptor for production use.",
 507                            UserWarning,
 508                            stacklevel=2,
 509                        )
 510
 511        # Server borrows a NetClient view of the node. AsterServer owns the
 512        # node lifecycle, so Server must NOT close the endpoint on its own.
 513        self._server = Server(
 514            net_client(self._node),
 515            services=self._services_in,
 516            codec=self._codec,
 517            interceptors=self._interceptors,
 518            owns_endpoint=False,
 519            peer_store=self._peer_store,
 520        )
 521
 522        self._print_banner()
 523        self._started = True
 524
 525        # Always log startup info (visible even when stderr is not a TTY)
 526        service_names = ", ".join(s.name for s in self._service_summaries)
 527        mode = "open-gate" if self._allow_all_consumers else "trusted"
 528        logger.info("server starting runtime=python services=[%s] mode=%s", service_names, mode)
 529
 530    def _print_banner(self) -> None:
 531        """Print the startup banner with service info."""
 532        import sys
 533        import os
 534
 535        # Only print banner when stderr is a terminal (not in tests/pipes)
 536        if not sys.stderr.isatty():
 537            return
 538
 539        C = "\033[36m"   # cyan
 540        B = "\033[1m"    # bold
 541        D = "\033[2m"    # dim
 542        G = "\033[32m"   # green
 543        Y = "\033[33m"   # yellow
 544        W = "\033[37m"   # white
 545        R = "\033[0m"    # reset
 546        w = sys.stderr.write
 547
 548        # ── Banner ────────────────────────────────────────────────────────
 549        w(f"\n{C}{B}")
 550        w(f"        _    ____ _____ _____ ____\n")
 551        w(f"       / \\  / ___|_   _| ____|  _ \\\n")
 552        w(f"      / _ \\ \\___ \\ | | |  _| | |_) |\n")
 553        w(f"     / ___ \\ ___) || | | |___|  _ <\n")
 554        w(f"    /_/   \\_\\____/ |_| |_____|_| \\_\\\n")
 555        w(f"{R}\n")
 556        w(f"    {D}RPC after hostnames.{R}\n\n")
 557
 558        # ── Services table ────────────────────────────────────────────────
 559        if self._service_summaries:
 560            # Find max name length for alignment
 561            max_name = max(len(s.name) for s in self._service_summaries)
 562            for s in self._service_summaries:
 563                name_pad = s.name.ljust(max_name)
 564                w(f"    {G}{R} {B}{name_pad}{R}  {D}v{s.version}{R}  {D}{s.contract_id}{R}\n")
 565            w("\n")
 566
 567        # ── Endpoint ─────────────────────────────────────────────────────
 568        compact = None
 569        endpoint_id_full = None
 570        if self._node:
 571            try:
 572                from . import AsterTicket
 573                addr_info = self._node.node_addr_info()
 574                endpoint_id_full = addr_info.endpoint_id
 575                t = AsterTicket(
 576                    endpoint_id=addr_info.endpoint_id,
 577                    direct_addrs=addr_info.direct_addresses or [],
 578                )
 579                compact = t.to_string()
 580            except Exception:
 581                pass
 582
 583        if endpoint_id_full:
 584            short = endpoint_id_full[:16] + "…"
 585            w(f"    {D}node id:{R}   {W}{short}{R}  {D}(this node's keypair fingerprint){R}\n")
 586        if compact:
 587            w(f"    {D}endpoint:{R}  {compact}\n")
 588
 589        # ── Mode ──────────────────────────────────────────────────────────
 590        mode_parts = []
 591        if self._allow_all_consumers:
 592            mode_parts.append(f"{Y}open-gate{R}")
 593        else:
 594            mode_parts.append(f"{G}trusted{R}")
 595        if self._registry_namespace:
 596            mode_parts.append(f"{G}registry{R}")
 597        w(f"    {D}mode:{R}      {' '.join(mode_parts)}\n")
 598
 599        # ── Logging ───────────────────────────────────────────────────────
 600        log_format = os.environ.get("ASTER_LOG_FORMAT", "text")
 601        log_level = os.environ.get("ASTER_LOG_LEVEL", "info")
 602        w(f"    {D}log:{R}       ASTER_LOG_FORMAT={W}{log_format}{R}  ASTER_LOG_LEVEL={W}{log_level}{R}\n")
 603
 604        # ── Versions ──────────────────────────────────────────────────────
 605        try:
 606            from importlib.metadata import version as _pkg_version
 607            aster_ver = _pkg_version("aster-rpc")
 608        except Exception:
 609            aster_ver = "?"
 610
 611        # Read iroh version from the native module
 612        iroh_ver = "0.97"  # pinned in Cargo.toml
 613
 614        w(f"    {D}runtime:{R}   aster-rpc {aster_ver} (python)  iroh {iroh_ver}\n")
 615
 616        # ── Copyright ─────────────────────────────────────────────────────
 617        w(f"\n    {D}Copyright \u00a9 2026 Emrul Islam. All rights reserved.{R}\n\n")
 618
 619    async def _publish_contracts(self) -> None:
 620        """Create a registry doc and publish each service's contract collection.
 621
 622        After publication, ``self._registry_namespace`` is set to the 64-char
 623        hex namespace_id so consumer admission can return it.
 624        """
 625        assert self._node is not None
 626
 627        try:
 628            dc = docs_client(self._node)
 629            bc = blobs_client(self._node)
 630
 631            # Create the registry doc (producer owns the write capability)
 632            registry_doc = await dc.create()
 633            author_id = await dc.create_author()
 634
 635            for svc in self._services_in:
 636                svc_cls = svc if inspect.isclass(svc) else type(svc)
 637                info = getattr(svc_cls, "__aster_service_info__", None)
 638                if info is None:
 639                    continue
 640
 641                # Build the type graph and contract
 642                from .contract.identity import (
 643                    ServiceContract,
 644                    build_type_graph,
 645                    canonical_xlang_bytes,
 646                    resolve_with_cycles,
 647                    compute_type_hash,
 648                )
 649                from .contract.publication import build_collection, upload_collection
 650                from .registry.keys import contract_key, version_key
 651                from .registry.models import ArtifactRef
 652
 653                # Collect root types
 654                root_types: list[type] = []
 655                for mi in info.methods.values():
 656                    if mi.request_type is not None:
 657                        root_types.append(mi.request_type)
 658                    if mi.response_type is not None:
 659                        root_types.append(mi.response_type)
 660
 661                type_graph = build_type_graph(root_types)
 662                type_defs = resolve_with_cycles(type_graph)
 663
 664                # Compute type hashes
 665                type_hashes: dict[str, bytes] = {}
 666                for fqn, td in type_defs.items():
 667                    td_bytes = canonical_xlang_bytes(td)
 668                    type_hashes[fqn] = compute_type_hash(td_bytes)
 669
 670                # Build ServiceContract and canonical bytes
 671                contract = ServiceContract.from_service_info(info, type_hashes)
 672                contract_bytes = canonical_xlang_bytes(contract)
 673
 674                import blake3 as _blake3
 675                contract_id = _blake3.blake3(contract_bytes).hexdigest()
 676
 677                # Build collection with full method schemas
 678                entries = build_collection(contract, type_defs, service_info=info)
 679
 680                # Upload to blob store as a native HashSeq collection.
 681                # GC protection is handled automatically by the HashSeq tag.
 682                collection_hash = await upload_collection(bc, entries)
 683
 684                # Create a collection ticket so consumers can download all
 685                # collection blobs (index + entries) in one transfer
 686                blob_ticket = bc.create_collection_ticket(collection_hash)
 687
 688                # Write ArtifactRef to registry doc
 689                import time as _time
 690                ref = ArtifactRef(
 691                    contract_id=contract_id,
 692                    collection_hash=collection_hash,
 693                    ticket=blob_ticket,
 694                    published_by=author_id,
 695                    published_at_epoch_ms=int(_time.time() * 1000),
 696                    collection_format="index",
 697                )
 698                await registry_doc.set_bytes(
 699                    author_id,
 700                    contract_key(contract_id),
 701                    ref.to_json().encode(),
 702                )
 703
 704                # Also write the manifest JSON directly to the registry doc
 705                # at a well-known key. This avoids the blob download round-trip
 706                # for consumers that only need method schemas (like the shell).
 707                manifest_data = None
 708                for ename, edata in entries:
 709                    if ename == "manifest.json":
 710                        manifest_data = edata
 711                        break
 712                if manifest_data:
 713                    from .registry.keys import version_key as _vk
 714                    manifest_key = f"manifests/{contract_id}".encode()
 715                    await registry_doc.set_bytes(
 716                        author_id, manifest_key, manifest_data
 717                    )
 718
 719                # Version pointer
 720                await registry_doc.set_bytes(
 721                    author_id,
 722                    version_key(info.name, info.version),
 723                    contract_id.encode(),
 724                )
 725
 726                logger.debug(
 727                    "Published contract %s for %s v%d (collection=%s)",
 728                    contract_id[:12],
 729                    info.name,
 730                    info.version,
 731                    collection_hash[:12],
 732                )
 733
 734            # share() enables the sync engine on the server side so consumers
 735            # can replicate this doc.  We only need the namespace_id on the wire.
 736            await registry_doc.share_with_addr("read")
 737            self._registry_namespace = registry_doc.doc_id()
 738            logger.debug(
 739                "Registry doc ready -- namespace: %s", self._registry_namespace[:16]
 740            )
 741
 742        except Exception as exc:
 743            # Publication failure is non-fatal -- the server still works,
 744            # consumers just won't get rich contract metadata
 745            logger.warning("Contract publication failed (non-fatal): %s", exc)
 746
 747    def serve(self) -> asyncio.Task:
 748        """Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task.
 749
 750        ``await server.serve()`` blocks until cancellation. The second call
 751        returns the same task (idempotent).
 752        """
 753        if self._serve_task is not None:
 754            return self._serve_task
 755        if not self._started:
 756            raise RuntimeError("AsterServer.serve() called before start()")
 757        assert self._server is not None
 758        assert self._node is not None
 759
 760        subtasks: list[asyncio.Task] = []
 761
 762        # Gate 0 hook loop: drain the after-handshake channel, apply the
 763        # MeshEndpointHook allowlist for every connection. before_connect is
 764        # auto-accepted inside NodeHookReceiver (the peer's endpoint ID
 765        # isn't authenticated at that stage).
 766        if self._hook is not None and self._node.has_hooks():
 767            self._hook_loop_task = asyncio.create_task(
 768                self._run_gate0(), name="aster-gate0"
 769            )
 770            subtasks.append(self._hook_loop_task)
 771
 772        self._peer_store.start_reaper()
 773
 774        subtasks.append(
 775            asyncio.create_task(self._accept_loop(), name="aster-accept")
 776        )
 777
 778        # Delegated admission loop: accept connections on aster.admission
 779        # ALPN for @aster-issued enrollment tokens.
 780        if self._delegation_policies:
 781            subtasks.append(
 782                asyncio.create_task(
 783                    self._delegated_admission_loop(), name="aster-delegated-admission"
 784                )
 785            )
 786
 787        # Auto-register endpoints with @aster for published services.
 788        # Requires producer service tokens (from `aster publish`).
 789        if self._producer_tokens:
 790            subtasks.append(
 791                asyncio.create_task(
 792                    self._aster_registration_loop(), name="aster-registration"
 793                )
 794            )
 795
 796        self._subtasks = subtasks
 797
 798        async def _wait_all() -> None:
 799            try:
 800                await asyncio.gather(*subtasks, return_exceptions=True)
 801            except asyncio.CancelledError:
 802                # Graceful shutdown on Ctrl+C / task cancellation
 803                logger.info("Server shutting down...")
 804                for t in subtasks:
 805                    t.cancel()
 806                await asyncio.gather(*subtasks, return_exceptions=True)
 807
 808        self._serve_task = asyncio.create_task(_wait_all(), name="aster-server-serve")
 809        return self._serve_task
 810
 811    async def _run_gate0(self) -> None:
 812        """Take the hook receiver from the node and run the hook loop."""
 813        assert self._node is not None
 814        assert self._hook is not None
 815        receiver = await self._node.take_hook_receiver()
 816        if receiver is None:
 817            logger.warning("AsterServer: hooks enabled but no receiver available")
 818            return
 819        await self._hook.run_hook_loop(receiver)
 820
 821    async def _accept_loop(self) -> None:
 822        """Pull from ``node.accept_aster()`` and dispatch per ALPN."""
 823        assert self._node is not None
 824        assert self._server is not None
 825        services_snapshot = list(self._service_summaries)
 826        try:
 827            while True:
 828                try:
 829                    alpn, conn = await self._node.accept_aster()
 830                except asyncio.CancelledError:
 831                    raise
 832                except Exception as exc:  # noqa: BLE001
 833                    logger.warning("AsterServer: accept_aster failed: %s", exc)
 834                    continue
 835
 836                if alpn == RPC_ALPN:
 837                    asyncio.create_task(
 838                        self._server.handle_connection(conn),
 839                        name="aster-rpc-conn",
 840                    )
 841                elif alpn == ALPN_CONSUMER_ADMISSION:
 842                    # Always handle consumer admission -- even when
 843                    # allow_all_consumers=True the consumer needs the
 844                    # services list from the admission response.
 845                    asyncio.create_task(
 846                        handle_consumer_admission_connection(
 847                            conn,
 848                            root_pubkey=self._root_pubkey or b"\x00" * 32,
 849                            hook=self._hook,
 850                            nonce_store=self._nonce_store,
 851                            services_getter=lambda: services_snapshot,
 852                            registry_namespace_getter=lambda: self._registry_namespace,
 853                            allow_unenrolled=self._allow_all_consumers,
 854                            peer_store=self._peer_store,
 855                            gossip_topic_getter=lambda: (
 856                                self._mesh_state.topic_id
 857                                if self._mesh_state else None
 858                            ),
 859                        ),
 860                        name="aster-consumer-admission-conn",
 861                    )
 862                elif alpn == ALPN_PRODUCER_ADMISSION and not self._allow_all_producers:
 863                    assert self._root_pubkey is not None
 864                    assert self._mesh_state is not None
 865                    asyncio.create_task(
 866                        handle_producer_admission_connection(
 867                            conn,
 868                            own_root_pubkey=self._root_pubkey,
 869                            own_state=self._mesh_state,
 870                            config=self._clock_drift_config,
 871                            persist_state=self._persist_mesh_state,
 872                        ),
 873                        name="aster-producer-admission-conn",
 874                    )
 875                else:
 876                    try:
 877                        conn.close(0, b"unexpected alpn")
 878                    except Exception:  # noqa: BLE001
 879                        pass
 880        except asyncio.CancelledError:
 881            pass
 882
 883    async def drain(self, grace_period: float = 10.0) -> None:
 884        """Graceful shutdown: stop accepting new connections, drain existing ones.
 885
 886        Compatible with Kubernetes ``preStop`` hooks and SIGTERM handling.
 887        After drain completes, call ``close()`` to shut down the node.
 888
 889        Args:
 890            grace_period: Seconds to wait for in-flight requests to complete.
 891        """
 892        logger.info("Draining server (grace_period=%.1fs)...", grace_period)
 893        if self._server is not None:
 894            await self._server.drain(grace_period)
 895        logger.info("Drain complete")
 896
 897    async def close(self) -> None:
 898        """Cancel serve loops and close the node. Safe to call multiple times."""
 899        if self._closed:
 900            return
 901        self._closed = True
 902
 903        for t in self._subtasks:
 904            t.cancel()
 905        if self._serve_task is not None:
 906            self._serve_task.cancel()
 907            try:
 908                await self._serve_task
 909            except (asyncio.CancelledError, Exception):
 910                pass
 911
 912        # Close the node -- this triggers router.shutdown() which closes all
 913        # protocol handlers (including aster queue handlers) and the endpoint.
 914        if self._node is not None:
 915            try:
 916                await self._node.close()
 917            except Exception:
 918                pass
 919
 920    # ── @aster endpoint registration ──────────────────────────────────────
 921
 922    def _load_producer_tokens(self) -> None:
 923        """Load producer service tokens from .aster-identity [published_services.*]."""
 924        _, peer_entry = self._config.load_identity(
 925            peer_name=self._peer_name, role="producer"
 926        )
 927        if not peer_entry:
 928            return
 929
 930        # The identity loader returns the raw peer dict. Published services
 931        # are stored as [published_services.<ServiceName>] sections in the
 932        # .aster-identity TOML file.
 933        published = peer_entry.get("published_services", {})
 934        if not isinstance(published, dict):
 935            return
 936
 937        for svc_name, token_data in published.items():
 938            if isinstance(token_data, dict) and token_data.get("producer_token"):
 939                self._producer_tokens[svc_name] = token_data
 940                logger.debug("Loaded producer token for %s", svc_name)
 941
 942    def _load_delegation_policies(self) -> None:
 943        """Build DelegatedAdmissionPolicy for each published service with aster_root_pubkey."""
 944        from aster.trust.delegated import DelegatedAdmissionPolicy
 945
 946        _, peer_entry = self._config.load_identity(
 947            peer_name=self._peer_name, role="producer"
 948        )
 949        if not peer_entry:
 950            return
 951
 952        published = peer_entry.get("published_services", {})
 953        if not isinstance(published, dict):
 954            return
 955
 956        # Match published services to the services we're hosting
 957        for svc_name, pub_data in published.items():
 958            if not isinstance(pub_data, dict):
 959                continue
 960            aster_root_pubkey = pub_data.get("aster_root_pubkey", "")
 961            contract_id = pub_data.get("contract_id", "")
 962            handle = pub_data.get("handle", peer_entry.get("handle", ""))
 963            if aster_root_pubkey and contract_id:
 964                self._delegation_policies[svc_name] = DelegatedAdmissionPolicy(
 965                    target_handle=handle,
 966                    target_service=svc_name,
 967                    target_contract_id=contract_id,
 968                    aster_root_pubkey=aster_root_pubkey,
 969                )
 970                logger.debug("Delegation policy loaded for %s", svc_name)
 971
 972    async def _delegated_admission_loop(self) -> None:
 973        """Accept connections on aster.admission ALPN and verify delegated tokens."""
 974        from aster.trust.delegated import handle_delegated_admission_connection
 975
 976        assert self._node is not None
 977        ALPN_DELEGATED = b"aster.admission"
 978
 979        try:
 980            while not self._closed:
 981                try:
 982                    conn = await self._node.accept_aster(ALPN_DELEGATED)
 983                except asyncio.CancelledError:
 984                    return
 985                except Exception as exc:
 986                    logger.debug("delegated admission accept error: %s", exc)
 987                    continue
 988
 989                # Determine which policy applies based on connection metadata
 990                # For now, use the first available policy (single-service producer)
 991                # or match by peer request content
 992                policy = next(iter(self._delegation_policies.values()), None)
 993                if policy is None:
 994                    logger.warning("delegated admission: no policy configured")
 995                    continue
 996
 997                asyncio.create_task(
 998                    handle_delegated_admission_connection(
 999                        conn,
1000                        policy=policy,
1001                        hook=self._hook,
1002                        peer_store=self._peer_store,
1003                    )
1004                )
1005        except asyncio.CancelledError:
1006            pass
1007
1008    async def _aster_registration_loop(self) -> None:
1009        """Background loop: register endpoints with @aster for published services.
1010
1011        Connects to @aster, registers each service's endpoint, then
1012        re-registers periodically before TTL expiry.
1013        """
1014        import json as _json
1015
1016        # Wait a moment for the server to be fully ready
1017        await asyncio.sleep(2)
1018
1019        ttl = 300  # 5 minutes
1020        interval = ttl * 0.75  # re-register at 75% of TTL
1021
1022        while not self._closed:
1023            try:
1024                await self._register_endpoints_with_aster(ttl)
1025            except asyncio.CancelledError:
1026                return
1027            except Exception as exc:
1028                logger.warning("@aster registration failed: %s", exc)
1029
1030            # Wait before re-registering
1031            try:
1032                await asyncio.sleep(interval)
1033            except asyncio.CancelledError:
1034                return
1035
1036    async def _register_endpoints_with_aster(self, ttl: int) -> None:
1037        """One-shot registration of all published service endpoints."""
1038        if not self._node or not self._producer_tokens:
1039            return
1040
1041        # Build our endpoint info
1042        addr_info = self._node.node_addr_info()
1043        node_id = addr_info.endpoint_id
1044        relay = addr_info.relay_url or ""
1045        direct_addrs = addr_info.direct_addresses or []
1046
1047        # Resolve @aster address from identity file or profile config.
1048        # The token itself contains the root_pubkey which can be used
1049        # to discover @aster via DNS TXT record in production.
1050        aster_addr = self._resolve_aster_address()
1051        if not aster_addr:
1052            logger.debug("No @aster address configured -- skipping registration")
1053            return
1054
1055        aster_client = AsterClient(address=aster_addr)
1056
1057        try:
1058            await aster_client.connect()
1059        except Exception as exc:
1060            logger.debug("Could not connect to @aster: %s", exc)
1061            return
1062
1063        try:
1064            # For each published service with a token, call register_endpoint
1065            for svc_name, token in self._producer_tokens.items():
1066                try:
1067                    # Use the dynamic invoke path -- we don't have generated
1068                    # types for @aster's PublicationService
1069                    import json as _json
1070                    request = {
1071                        "producer_token": _json.dumps(token),
1072                        "node_id": node_id,
1073                        "relay": relay,
1074                        "direct_addrs": direct_addrs,
1075                        "ttl": ttl,
1076                    }
1077
1078                    # Invoke register_endpoint on PublicationService
1079                    conn = await aster_client._rpc_conn_for(
1080                        next(
1081                            (s.channels.get("rpc", "") for s in aster_client._services
1082                             if s.name == "PublicationService"),
1083                            ""
1084                        )
1085                    )
1086                    from .transport.iroh import IrohTransport
1087                    transport = IrohTransport(conn, codec=self._codec)
1088                    resp = await transport.unary(
1089                        "PublicationService", "register_endpoint", request
1090                    )
1091                    logger.info(
1092                        "Registered endpoint with @aster: %s (%s)",
1093                        svc_name, node_id[:12],
1094                    )
1095                except Exception as exc:
1096                    logger.warning(
1097                        "Failed to register %s with @aster: %s",
1098                        svc_name, exc,
1099                    )
1100        finally:
1101            await aster_client.close()
1102
1103    def _resolve_aster_address(self) -> str | None:
1104        """Resolve the @aster service address for endpoint registration.
1105
1106        Checks (in order):
1107        1. ASTER_SERVICE_ADDRESS env var
1108        2. aster_service.address in the identity file's peer entry
1109        3. DNS TXT record on aster.site (future)
1110        """
1111        # Env var override
1112        addr = os.environ.get("ASTER_SERVICE_ADDRESS", "")
1113        if addr:
1114            return addr
1115
1116        # Identity file -- the peer entry may have aster_service config
1117        _, peer_entry = self._config.load_identity(
1118            peer_name=self._peer_name, role="producer"
1119        )
1120        if peer_entry:
1121            addr = peer_entry.get("aster_service", "")
1122            if addr:
1123                return addr
1124
1125        return None
1126
1127    def _install_signal_handlers(self, grace_period: float = 10.0) -> None:
1128        """Install SIGTERM/SIGINT handlers for graceful shutdown.
1129
1130        Call after ``serve()`` to enable k8s-compatible shutdown:
1131
1132        - SIGTERM: drain → close (graceful)
1133        - SIGINT (Ctrl+C): drain → close (graceful)
1134        - Second SIGINT: immediate exit
1135
1136        Usage::
1137
1138            async with AsterServer(services=[...]) as srv:
1139                srv.install_signal_handlers()
1140                await srv.serve()
1141        """
1142        import signal
1143
1144        loop = asyncio.get_event_loop()
1145        shutdown_count = 0
1146
1147        def _handle_signal(sig: int, frame: Any) -> None:
1148            nonlocal shutdown_count
1149            shutdown_count += 1
1150            if shutdown_count > 1:
1151                logger.warning("Forced exit (second signal)")
1152                sys.exit(1)
1153            logger.info("Received %s -- draining...", signal.Signals(sig).name)
1154            loop.create_task(self._graceful_shutdown(grace_period))
1155
1156        signal.signal(signal.SIGTERM, _handle_signal)
1157        signal.signal(signal.SIGINT, _handle_signal)
1158
1159    async def _graceful_shutdown(self, grace_period: float) -> None:
1160        """Internal: drain then close."""
1161        try:
1162            await self.drain(grace_period)
1163        finally:
1164            await self.close()
1165
1166    async def __aenter__(self) -> "AsterServer":
1167        await self.start()
1168        self.serve()
1169        return self
1170
1171    async def __aexit__(self, exc_type, exc, tb) -> None:
1172        await self.close()
1173
1174    # ── Properties ───────────────────────────────────────────────────────────
1175
1176    @property
1177    def address(self) -> str:
1178        """Connection address for this server (``aster1...`` ticket).
1179
1180        Pass this to ``AsterClient(address=...)`` or ``aster shell``
1181        to connect.  Includes relay address (if available) and direct
1182        addresses for LAN connectivity.
1183        """
1184        self._require_started()
1185        assert self._node is not None
1186        from . import AsterTicket
1187        addr_info = self._node.node_addr_info()
1188
1189        # Resolve relay URL to IP:port for the ticket
1190        relay_addr = None
1191        if addr_info.relay_url:
1192            relay_addr = _resolve_relay_addr(addr_info.relay_url)
1193
1194        t = AsterTicket(
1195            endpoint_id=addr_info.endpoint_id,
1196            relay_addr=relay_addr,
1197            direct_addrs=addr_info.direct_addresses or [],
1198        )
1199        return t.to_string()
1200
1201    @property
1202    def endpoint_id(self) -> str:
1203        """Hex endpoint ID of this server's node."""
1204        self._require_started()
1205        assert self._node is not None
1206        return self._node.node_id()
1207
1208    # ── Back-compat aliases ──────────────────────────────────────────────
1209
1210    @property
1211    def _ticket(self) -> str:
1212        """Alias for :attr:`address` (internal back-compat)."""
1213        return self.address
1214
1215    # Back-compat alias -- used in tests
1216    @property
1217    def endpoint_addr_b64(self) -> str:
1218        self._require_started()
1219        assert self._node is not None
1220        return base64.b64encode(self._node.node_addr_info().to_bytes()).decode()
1221
1222    @property
1223    def _rpc_addr_b64(self) -> str:
1224        return self.endpoint_addr_b64
1225
1226    @property
1227    def _admission_addr_b64(self) -> str | None:
1228        if self._allow_all_consumers and self._allow_all_producers:
1229            return None
1230        return self.endpoint_addr_b64
1231
1232    @property
1233    def _consumer_admission_addr_b64(self) -> str | None:
1234        if self._allow_all_consumers:
1235            return None
1236        return self.endpoint_addr_b64
1237
1238    @property
1239    def _producer_admission_addr_b64(self) -> str | None:
1240        if self._allow_all_producers:
1241            return None
1242        return self.endpoint_addr_b64
1243
1244    @property
1245    def services(self) -> list[ServiceSummary]:
1246        """List of services hosted by this server."""
1247        self._require_started()
1248        return list(self._service_summaries)
1249
1250    @property
1251    def root_pubkey(self) -> bytes | None:
1252        """The 32-byte ed25519 trust anchor public key, or ``None``."""
1253        return self._root_pubkey
1254
1255    # ── Iroh protocol clients (lazy) ─────────────────────────────────────────
1256
1257    @property
1258    def node(self) -> IrohNode:
1259        """The underlying ``IrohNode`` (escape hatch for direct iroh access)."""
1260        self._require_started()
1261        assert self._node is not None
1262        return self._node
1263
1264    @property
1265    def blobs(self) -> Any:
1266        """Blobs client backed by this node."""
1267        self._require_started()
1268        if self._blobs is None:
1269            self._blobs = blobs_client(self._node)
1270        return self._blobs
1271
1272    @property
1273    def docs(self) -> Any:
1274        """Docs client backed by this node."""
1275        self._require_started()
1276        if self._docs is None:
1277            self._docs = docs_client(self._node)
1278        return self._docs
1279
1280    @property
1281    def gossip(self) -> Any:
1282        """Gossip client backed by this node."""
1283        self._require_started()
1284        if self._gossip is None:
1285            self._gossip = gossip_client(self._node)
1286        return self._gossip
1287
1288    # Back-compat aliases
1289    @property
1290    def endpoint(self) -> Any:
1291        """Escape hatch: the ``NetClient`` view of this node's endpoint."""
1292        self._require_started()
1293        return net_client(self._node)
1294
1295    @property
1296    def _rpc_endpoint(self) -> Any:
1297        return self.endpoint
1298
1299    def _require_started(self) -> None:
1300        if not self._started:
1301            raise RuntimeError("AsterServer not started; call start() first")

High-level, declarative producer.

Builds a single IrohNode that serves blobs + docs + gossip (iroh built-in protocols) alongside aster RPC (aster/1) and any enabled admission ALPNs -- all on one endpoint, one node ID.

When any admission gate is active (allow_all_consumers=False or allow_all_producers=False), the node is built with enable_hooks=True and a background task runs the Gate 0 connection-level hook loop (MeshEndpointHook.run_hook_loop), which gates all protocols (blobs, docs, gossip, aster/1, admission) at the QUIC handshake layer.

AsterServer( services: list, *, config: aster.config.AsterConfig | None = None, peer: str | None = None, identity: str | None = None, root_pubkey: bytes | None = None, allow_all_consumers: bool | None = None, allow_all_producers: bool | None = None, endpoint_config: EndpointConfig | None = None, channel_name: str = 'rpc', codec: typing.Any | None = None, interceptors: list[typing.Any] | None = None, hook: aster.trust.hooks.MeshEndpointHook | None = None, nonce_store: typing.Any | None = None, registry_namespace: str = '', mesh_state: aster.trust.mesh.MeshState | None = None, clock_drift_config: aster.trust.mesh.ClockDriftConfig | None = None, persist_mesh_state: bool = False)
165    def __init__(
166        self,
167        services: list,
168        *,
169        config: "AsterConfig | None" = None,
170        peer: str | None = None,
171        identity: str | None = None,
172        # Inline overrides (take priority over config):
173        root_pubkey: bytes | None = None,
174        allow_all_consumers: bool | None = None,
175        allow_all_producers: bool | None = None,
176        endpoint_config: EndpointConfig | None = None,
177        # Internal wiring:
178        channel_name: str = "rpc",
179        codec: Any | None = None,
180        interceptors: list[Any] | None = None,
181        hook: MeshEndpointHook | None = None,
182        nonce_store: Any | None = None,
183        registry_namespace: str = "",
184        mesh_state: MeshState | None = None,
185        clock_drift_config: ClockDriftConfig | None = None,
186        persist_mesh_state: bool = False,
187    ) -> None:
188        """Create an Aster RPC server.
189
190        .. note:: **Interceptors are not wired by default.** The server ships
191           with interceptors for rate limiting, deadline enforcement, auth,
192           capability checks, circuit breaking, metrics, audit logging, and
193           retry hints -- but none are active unless you pass them via the
194           ``interceptors`` parameter. For production use, wire at minimum
195           ``DeadlineInterceptor`` and ``RateLimitInterceptor``. All
196           interceptors live in ``aster.interceptors``.
197
198        Args:
199            services: List of ``@service``-decorated class instances to serve.
200                At least one is required.
201            config: Optional :class:`AsterConfig` for trust, storage, and
202                networking settings. If omitted, settings are loaded from
203                environment variables and defaults.
204            peer: Optional peer name for this server (used in config lookup
205                and identity file resolution).
206            identity: Path to ``.aster-identity`` file (default: auto-detected
207                from CWD). Overrides ``config.identity_file``.
208            root_pubkey: 32-byte ed25519 public key for the trust anchor.
209                Overrides ``config.root_pubkey`` if both are set.
210            allow_all_consumers: If ``True``, skip consumer admission
211                (open gate). Overrides ``config.allow_all_consumers``.
212            allow_all_producers: If ``True``, skip producer admission.
213                Overrides ``config.allow_all_producers``.
214            endpoint_config: Low-level iroh endpoint configuration.
215
216        Example::
217
218            @service(name="MyService", version=1)
219            class MyService:
220                @rpc()
221                async def hello(self, req: HelloRequest) -> HelloResponse:
222                    return HelloResponse(message=f"Hello {req.name}")
223
224            async with AsterServer(services=[MyService()]) as srv:
225                print(srv.address)
226                await srv.serve()
227        """
228        if not services:
229            raise ValueError("AsterServer requires at least one service")
230
231        # Auto-load config from env if none provided.
232        from .config import AsterConfig
233        if config is None:
234            config = AsterConfig.from_env()
235        if identity is not None:
236            config.identity_file = identity
237        self._config = config
238
239        # Inline overrides win over config.
240        self._allow_all_consumers = (
241            allow_all_consumers if allow_all_consumers is not None
242            else config.allow_all_consumers
243        )
244        self._allow_all_producers = (
245            allow_all_producers if allow_all_producers is not None
246            else config.allow_all_producers
247        )
248
249        # Load identity file if present (.aster-identity).
250        self._peer_name = peer
251        secret_key_from_identity, peer_entry = config.load_identity(
252            peer_name=peer, role="producer"
253        )
254        if peer_entry and not root_pubkey:
255            # Root pubkey comes from the credential in the identity file.
256            root_pubkey = bytes.fromhex(peer_entry["root_pubkey"])
257        if secret_key_from_identity and not config.secret_key:
258            import base64 as _b64
259            config.secret_key = secret_key_from_identity
260
261        # Resolve root public key: inline > identity file > config file > ephemeral.
262        # The root private key is NEVER on a running node (trust spec §1.1).
263        pub = config.resolve_root_pubkey()
264        self._root_pubkey = root_pubkey if root_pubkey is not None else pub
265
266        # Dev mode: if using an ephemeral root key (no explicit pubkey file),
267        # auto-open the consumer gate so the quickstart works without
268        # credential files. In production (explicit root_pubkey_file),
269        # the default allow_all_consumers=False requires credentials.
270        if (
271            config._ephemeral_privkey is not None
272            and allow_all_consumers is None
273            and config.root_pubkey_file is None
274        ):
275            self._allow_all_consumers = True
276            logger.info(
277                "Dev mode: allow_all_consumers=True (ephemeral root key). "
278                "Set ASTER_ROOT_PUBKEY_FILE for production admission."
279            )
280
281        if (not self._allow_all_consumers or not self._allow_all_producers) and self._root_pubkey is None:
282            raise ValueError(
283                "root_pubkey is required when admission is enabled "
284                "(allow_all_consumers=False or allow_all_producers=False). "
285                "Set ASTER_ROOT_PUBKEY_FILE or pass root_pubkey= explicitly."
286            )
287
288        self._services_in: list = list(services)
289        self._endpoint_config_template = endpoint_config or config.to_endpoint_config()
290        self._channel_name = channel_name
291        self._codec = codec
292        from aster.interceptors.deadline import DeadlineInterceptor
293        if interceptors is not None:
294            self._interceptors = list(interceptors)
295        else:
296            self._interceptors = [DeadlineInterceptor()]
297        self._hook = hook
298        self._nonce_store = nonce_store
299
300        # Admission → dispatch bridge: stores per-peer attributes
301        from aster.peer_store import PeerAttributeStore
302        self._peer_store = PeerAttributeStore()
303        self._registry_namespace = registry_namespace
304        self._mesh_state = mesh_state
305        self._clock_drift_config = clock_drift_config
306        self._persist_mesh_state = persist_mesh_state
307
308        # Populated by start()
309        self._started: bool = False
310        self._node: IrohNode | None = None
311        self._service_summaries: list[ServiceSummary] = []
312        self._server: Server | None = None
313        # Lazy caches for .blobs / .docs / .gossip
314        self._blobs: Any | None = None
315        self._docs: Any | None = None
316        self._gossip: Any | None = None
317
318        # Populated by serve()
319        self._serve_task: asyncio.Task | None = None
320        self._subtasks: list[asyncio.Task] = []
321        self._closed: bool = False
322
323        # Producer service tokens for @aster endpoint registration.
324        self._producer_tokens: dict[str, dict] = {}  # service_name -> token dict
325        self._load_producer_tokens()
326
327        # Delegation policies for aster.admission ALPN.
328        # Built from published_services entries that have aster_root_pubkey.
329        self._delegation_policies: dict[str, Any] = {}  # service_name -> policy
330        self._load_delegation_policies()

Create an Aster RPC server.

Interceptors are not wired by default. The server ships

with interceptors for rate limiting, deadline enforcement, auth, capability checks, circuit breaking, metrics, audit logging, and retry hints -- but none are active unless you pass them via the interceptors parameter. For production use, wire at minimum DeadlineInterceptor and RateLimitInterceptor. All interceptors live in aster.interceptors.

Args: services: List of @service-decorated class instances to serve. At least one is required. config: Optional AsterConfig for trust, storage, and networking settings. If omitted, settings are loaded from environment variables and defaults. peer: Optional peer name for this server (used in config lookup and identity file resolution). identity: Path to .aster-identity file (default: auto-detected from CWD). Overrides config.identity_file. root_pubkey: 32-byte ed25519 public key for the trust anchor. Overrides config.root_pubkey if both are set. allow_all_consumers: If True, skip consumer admission (open gate). Overrides config.allow_all_consumers. allow_all_producers: If True, skip producer admission. Overrides config.allow_all_producers. endpoint_config: Low-level iroh endpoint configuration.

Example::

@service(name="MyService", version=1)
class MyService:
    @rpc()
    async def hello(self, req: HelloRequest) -> HelloResponse:
        return HelloResponse(message=f"Hello {req.name}")

async with AsterServer(services=[MyService()]) as srv:
    print(srv.address)
    await srv.serve()
async def start(self) -> None:
334    async def start(self) -> None:
335        """Create the unified node and compute ``ServiceSummary`` list. Idempotent."""
336        if self._started:
337            return
338
339        # Configure structured logging from config (idempotent)
340        from aster.logging import configure_logging
341        configure_logging(
342            format=self._config.log_format,
343            level=self._config.log_level,
344            mask=self._config.log_mask,
345        )
346
347        # Determine which aster ALPNs to register on the Router.
348        # Consumer admission is ALWAYS registered -- even in open-gate mode
349        # the consumer uses it to discover services.
350        aster_alpns: list[bytes] = [RPC_ALPN, ALPN_CONSUMER_ADMISSION]
351        gate0_needed = False
352        if not self._allow_all_consumers:
353            if self._hook is None:
354                self._hook = MeshEndpointHook(peer_store=self._peer_store)
355            if self._nonce_store is None:
356                self._nonce_store = InMemoryNonceStore()
357            gate0_needed = True
358        if not self._allow_all_producers:
359            aster_alpns.append(ALPN_PRODUCER_ADMISSION)
360            if self._hook is None:
361                self._hook = MeshEndpointHook(peer_store=self._peer_store)
362            gate0_needed = True
363
364        # Build EndpointConfig so hooks (Gate 0) are installed when needed.
365        ep_cfg = _build_node_endpoint_config(
366            self._endpoint_config_template, enable_hooks=gate0_needed
367        )
368
369        self._node = await IrohNode.memory_with_alpns(aster_alpns, ep_cfg)
370        addr_b64 = base64.b64encode(
371            self._node.node_addr_info().to_bytes()
372        ).decode()
373
374        # Auto-create ephemeral MeshState. Even when allow_all_producers=True
375        # we need the topic_id so the root node's shell can observe gossip.
376        if self._mesh_state is None and self._root_pubkey is not None:
377            self._mesh_state = make_ephemeral_mesh_state(self._root_pubkey)
378
379        # Build ServiceSummary list with per-spec contract_id.
380        summaries: list[ServiceSummary] = []
381        for svc in self._services_in:
382            svc_cls = svc if inspect.isclass(svc) else type(svc)
383            info = getattr(svc_cls, "__aster_service_info__", None)
384            if info is None:
385                raise TypeError(
386                    f"{svc_cls!r} is not @service-decorated "
387                    f"(missing __aster_service_info__)"
388                )
389            cid = contract_id_from_service(svc_cls)
390            summaries.append(
391                ServiceSummary(
392                    name=info.name,
393                    version=info.version,
394                    contract_id=cid,
395                    channels={self._channel_name: addr_b64},
396                    pattern=info.scoped or "shared",
397                )
398            )
399        self._service_summaries = summaries
400
401        # ── C11.4.3: Startup publication verification ────────────────────
402        # If .aster/manifest.json exists, verify that each service's live
403        # contract_id matches the committed manifest. Fatal on mismatch.
404        manifest_path = os.path.join(os.getcwd(), ".aster", "manifest.json")
405        if os.path.isfile(manifest_path):
406            import json as _json
407            from .contract.manifest import ContractManifest
408
409            try:
410                with open(manifest_path, encoding="utf-8") as f:
411                    manifest_data = _json.load(f)
412            except Exception as exc:
413                raise RuntimeError(
414                    f"Failed to read manifest at {manifest_path}: {exc}"
415                ) from exc
416
417            # Support both single manifest and list of manifests
418            manifests_list = manifest_data if isinstance(manifest_data, list) else [manifest_data]
419            manifest_by_key: dict[tuple[str, int], ContractManifest] = {}
420            for md in manifests_list:
421                m = ContractManifest(**md)
422                manifest_by_key[(m.service, m.version)] = m
423
424            for summary in summaries:
425                key = (summary.name, summary.version)
426                manifest = manifest_by_key.get(key)
427                if manifest is None:
428                    continue  # Service not in manifest -- skip
429                if summary.contract_id != manifest.contract_id:
430                    raise RuntimeError(
431                        f"Contract identity mismatch for {summary.name!r} "
432                        f"v{summary.version}:\n"
433                        f"  Expected (manifest): {manifest.contract_id}\n"
434                        f"  Actual (live):       {summary.contract_id}\n"
435                        f"  Manifest: {manifest_path}\n"
436                        f"  -> The service interface has changed without "
437                        f"updating the manifest.\n"
438                        f"     Rerun `aster contract gen` and commit the "
439                        f"updated manifest."
440                    )
441            logger.debug("Manifest verification passed for %d service(s)", len(manifest_by_key))
442
443        # ── Contract publication to registry doc ─────────────────────────
444        # Create a registry doc, publish contract collections (with manifest
445        # including method schemas), and generate a read-only share ticket
446        # so consumers can discover full contract metadata.
447        await self._publish_contracts()
448
449        # ── Gate 3: capability interceptor & default-deny warning ────────
450        #
451        # Build a service_name -> ServiceInfo map for the CapabilityInterceptor
452        # and check whether any service has authorization configured.
453        from .interceptors.capability import CapabilityInterceptor
454
455        svc_map: dict[str, Any] = {}
456        any_has_requires = False
457        for svc in self._services_in:
458            svc_cls = svc if inspect.isclass(svc) else type(svc)
459            info = getattr(svc_cls, "__aster_service_info__", None)
460            if info is not None:
461                svc_map[info.name] = info
462                if info.requires is not None:
463                    any_has_requires = True
464                for mi in info.methods.values():
465                    if mi.requires is not None:
466                        any_has_requires = True
467
468        # Auto-wire the CapabilityInterceptor when trust is configured
469        # (Gate 0 enabled) or when any service declares requires.
470        has_auth_interceptor = any(
471            isinstance(i, CapabilityInterceptor) for i in self._interceptors
472        )
473        if (not self._allow_all_consumers or any_has_requires) and not has_auth_interceptor:
474            # Prepend so capability is checked before application interceptors.
475            self._interceptors.insert(0, CapabilityInterceptor(svc_map))
476
477        # S12.2: default-deny startup warning.
478        # When Gate 0 is disabled (allow_all_consumers=True), any service
479        # without explicit authorization is wide open.  Emit a warning so
480        # the developer knows.
481        if self._allow_all_consumers:
482            for svc in self._services_in:
483                svc_cls = svc if inspect.isclass(svc) else type(svc)
484                info = getattr(svc_cls, "__aster_service_info__", None)
485                if info is None:
486                    continue
487                if info.public:
488                    continue
489                svc_has_auth = info.requires is not None
490                if not svc_has_auth:
491                    svc_has_auth = any(
492                        mi.requires is not None for mi in info.methods.values()
493                    )
494                if not svc_has_auth:
495                    # Also check if user explicitly added an auth interceptor.
496                    from .interceptors.auth import AuthInterceptor
497                    has_explicit_auth = any(
498                        isinstance(i, AuthInterceptor) for i in self._interceptors
499                    )
500                    if not has_explicit_auth:
501                        warnings.warn(
502                            f"Service '{info.name}' has no authorization configured "
503                            f"and Gate 0 is disabled (allow_all_consumers=True). "
504                            f"All consumers can call this service without "
505                            f"authentication. Add @service(requires=...) or "
506                            f"configure an auth interceptor for production use.",
507                            UserWarning,
508                            stacklevel=2,
509                        )
510
511        # Server borrows a NetClient view of the node. AsterServer owns the
512        # node lifecycle, so Server must NOT close the endpoint on its own.
513        self._server = Server(
514            net_client(self._node),
515            services=self._services_in,
516            codec=self._codec,
517            interceptors=self._interceptors,
518            owns_endpoint=False,
519            peer_store=self._peer_store,
520        )
521
522        self._print_banner()
523        self._started = True
524
525        # Always log startup info (visible even when stderr is not a TTY)
526        service_names = ", ".join(s.name for s in self._service_summaries)
527        mode = "open-gate" if self._allow_all_consumers else "trusted"
528        logger.info("server starting runtime=python services=[%s] mode=%s", service_names, mode)

Create the unified node and compute ServiceSummary list. Idempotent.

def serve(self) -> _asyncio.Task:
747    def serve(self) -> asyncio.Task:
748        """Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task.
749
750        ``await server.serve()`` blocks until cancellation. The second call
751        returns the same task (idempotent).
752        """
753        if self._serve_task is not None:
754            return self._serve_task
755        if not self._started:
756            raise RuntimeError("AsterServer.serve() called before start()")
757        assert self._server is not None
758        assert self._node is not None
759
760        subtasks: list[asyncio.Task] = []
761
762        # Gate 0 hook loop: drain the after-handshake channel, apply the
763        # MeshEndpointHook allowlist for every connection. before_connect is
764        # auto-accepted inside NodeHookReceiver (the peer's endpoint ID
765        # isn't authenticated at that stage).
766        if self._hook is not None and self._node.has_hooks():
767            self._hook_loop_task = asyncio.create_task(
768                self._run_gate0(), name="aster-gate0"
769            )
770            subtasks.append(self._hook_loop_task)
771
772        self._peer_store.start_reaper()
773
774        subtasks.append(
775            asyncio.create_task(self._accept_loop(), name="aster-accept")
776        )
777
778        # Delegated admission loop: accept connections on aster.admission
779        # ALPN for @aster-issued enrollment tokens.
780        if self._delegation_policies:
781            subtasks.append(
782                asyncio.create_task(
783                    self._delegated_admission_loop(), name="aster-delegated-admission"
784                )
785            )
786
787        # Auto-register endpoints with @aster for published services.
788        # Requires producer service tokens (from `aster publish`).
789        if self._producer_tokens:
790            subtasks.append(
791                asyncio.create_task(
792                    self._aster_registration_loop(), name="aster-registration"
793                )
794            )
795
796        self._subtasks = subtasks
797
798        async def _wait_all() -> None:
799            try:
800                await asyncio.gather(*subtasks, return_exceptions=True)
801            except asyncio.CancelledError:
802                # Graceful shutdown on Ctrl+C / task cancellation
803                logger.info("Server shutting down...")
804                for t in subtasks:
805                    t.cancel()
806                await asyncio.gather(*subtasks, return_exceptions=True)
807
808        self._serve_task = asyncio.create_task(_wait_all(), name="aster-server-serve")
809        return self._serve_task

Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task.

await server.serve() blocks until cancellation. The second call returns the same task (idempotent).

async def drain(self, grace_period: float = 10.0) -> None:
883    async def drain(self, grace_period: float = 10.0) -> None:
884        """Graceful shutdown: stop accepting new connections, drain existing ones.
885
886        Compatible with Kubernetes ``preStop`` hooks and SIGTERM handling.
887        After drain completes, call ``close()`` to shut down the node.
888
889        Args:
890            grace_period: Seconds to wait for in-flight requests to complete.
891        """
892        logger.info("Draining server (grace_period=%.1fs)...", grace_period)
893        if self._server is not None:
894            await self._server.drain(grace_period)
895        logger.info("Drain complete")

Graceful shutdown: stop accepting new connections, drain existing ones.

Compatible with Kubernetes preStop hooks and SIGTERM handling. After drain completes, call close() to shut down the node.

Args: grace_period: Seconds to wait for in-flight requests to complete.

async def close(self) -> None:
897    async def close(self) -> None:
898        """Cancel serve loops and close the node. Safe to call multiple times."""
899        if self._closed:
900            return
901        self._closed = True
902
903        for t in self._subtasks:
904            t.cancel()
905        if self._serve_task is not None:
906            self._serve_task.cancel()
907            try:
908                await self._serve_task
909            except (asyncio.CancelledError, Exception):
910                pass
911
912        # Close the node -- this triggers router.shutdown() which closes all
913        # protocol handlers (including aster queue handlers) and the endpoint.
914        if self._node is not None:
915            try:
916                await self._node.close()
917            except Exception:
918                pass

Cancel serve loops and close the node. Safe to call multiple times.

address: str
1176    @property
1177    def address(self) -> str:
1178        """Connection address for this server (``aster1...`` ticket).
1179
1180        Pass this to ``AsterClient(address=...)`` or ``aster shell``
1181        to connect.  Includes relay address (if available) and direct
1182        addresses for LAN connectivity.
1183        """
1184        self._require_started()
1185        assert self._node is not None
1186        from . import AsterTicket
1187        addr_info = self._node.node_addr_info()
1188
1189        # Resolve relay URL to IP:port for the ticket
1190        relay_addr = None
1191        if addr_info.relay_url:
1192            relay_addr = _resolve_relay_addr(addr_info.relay_url)
1193
1194        t = AsterTicket(
1195            endpoint_id=addr_info.endpoint_id,
1196            relay_addr=relay_addr,
1197            direct_addrs=addr_info.direct_addresses or [],
1198        )
1199        return t.to_string()

Connection address for this server (aster1... ticket).

Pass this to AsterClient(address=...) or aster shell to connect. Includes relay address (if available) and direct addresses for LAN connectivity.

endpoint_id: str
1201    @property
1202    def endpoint_id(self) -> str:
1203        """Hex endpoint ID of this server's node."""
1204        self._require_started()
1205        assert self._node is not None
1206        return self._node.node_id()

Hex endpoint ID of this server's node.

services: list[aster.registry.models.ServiceSummary]
1244    @property
1245    def services(self) -> list[ServiceSummary]:
1246        """List of services hosted by this server."""
1247        self._require_started()
1248        return list(self._service_summaries)

List of services hosted by this server.

root_pubkey: bytes | None
1250    @property
1251    def root_pubkey(self) -> bytes | None:
1252        """The 32-byte ed25519 trust anchor public key, or ``None``."""
1253        return self._root_pubkey

The 32-byte ed25519 trust anchor public key, or None.

node: IrohNode
1257    @property
1258    def node(self) -> IrohNode:
1259        """The underlying ``IrohNode`` (escape hatch for direct iroh access)."""
1260        self._require_started()
1261        assert self._node is not None
1262        return self._node

The underlying IrohNode (escape hatch for direct iroh access).

blobs: Any
1264    @property
1265    def blobs(self) -> Any:
1266        """Blobs client backed by this node."""
1267        self._require_started()
1268        if self._blobs is None:
1269            self._blobs = blobs_client(self._node)
1270        return self._blobs

Blobs client backed by this node.

docs: Any
1272    @property
1273    def docs(self) -> Any:
1274        """Docs client backed by this node."""
1275        self._require_started()
1276        if self._docs is None:
1277            self._docs = docs_client(self._node)
1278        return self._docs

Docs client backed by this node.

gossip: Any
1280    @property
1281    def gossip(self) -> Any:
1282        """Gossip client backed by this node."""
1283        self._require_started()
1284        if self._gossip is None:
1285            self._gossip = gossip_client(self._node)
1286        return self._gossip

Gossip client backed by this node.

endpoint: Any
1289    @property
1290    def endpoint(self) -> Any:
1291        """Escape hatch: the ``NetClient`` view of this node's endpoint."""
1292        self._require_started()
1293        return net_client(self._node)

Escape hatch: the NetClient view of this node's endpoint.

class AsterClient:
1307class AsterClient:
1308    """High-level, declarative consumer.
1309
1310    Reads configuration from :class:`AsterConfig` (env vars / TOML file)
1311    just like :class:`AsterServer`.  In dev mode (no credentials, ephemeral
1312    producer), ``AsterClient()`` with just ``ASTER_ENDPOINT_ADDR`` set is
1313    enough.  In production, set ``ASTER_ENROLLMENT_CREDENTIAL`` to the
1314    path of a pre-signed token from the operator.
1315
1316    ``endpoint_addr`` may be a base64 ``NodeAddr`` string (as printed by
1317    :class:`AsterServer`), an ``EndpointId`` hex string (when discovery is
1318    enabled), a :class:`NodeAddr` object, or raw ``NodeAddr.to_bytes()``
1319    bytes.
1320    """
1321
1322    def __init__(
1323        self,
1324        *,
1325        config: "AsterConfig | None" = None,
1326        peer: str | None = None,
1327        identity: str | None = None,
1328        # Connection address (aster1... ticket, base64 NodeAddr, or hex EndpointId):
1329        address: str | None = None,
1330        # Back-compat alias for address:
1331        endpoint_addr: NodeAddr | str | bytes | None = None,
1332        root_pubkey: bytes | None = None,
1333        enrollment_credential_file: str | None = None,
1334        # Internal wiring:
1335        channel_name: str = "rpc",
1336    ) -> None:
1337        """Create an Aster RPC client.
1338
1339        Args:
1340            config: Optional :class:`AsterConfig`. If omitted, settings are
1341                loaded from environment variables.
1342            peer: Peer name for identity file lookup.
1343            identity: Path to ``.aster-identity`` file (default: auto-detected
1344                from CWD). Overrides ``config.identity_file``.
1345            address: The server's address. Accepts:
1346                - ``aster1...`` compact ticket (recommended)
1347                - Base64-encoded ``NodeAddr``
1348                - Hex ``EndpointId`` (requires discovery)
1349            endpoint_addr: Alias for *address* (back-compat).
1350            root_pubkey: 32-byte ed25519 public key of the server's trust
1351                anchor. Required for credential-based admission.
1352            enrollment_credential_file: Path to a pre-signed enrollment
1353                credential (``.cred`` file from ``aster enroll``).
1354
1355        Example::
1356
1357            # Dev mode -- open gate, no credentials
1358            client = AsterClient(address="aster1...")
1359            await client.connect()
1360
1361            # Production -- with credential
1362            client = AsterClient(
1363                address="aster1...",
1364                root_pubkey=pub_key,
1365                enrollment_credential_file="my-agent.cred",
1366            )
1367            await client.connect()
1368        """
1369        from .config import AsterConfig
1370
1371        if config is None:
1372            config = AsterConfig.from_env()
1373        if identity is not None:
1374            config.identity_file = identity
1375        self._config = config
1376        self._peer_name = peer
1377
1378        # Load identity file if present (.aster-identity).
1379        secret_key_from_identity, peer_entry = config.load_identity(
1380            peer_name=peer, role="consumer"
1381        )
1382
1383        # If the user only passed `enrollment_credential_file` (no separate
1384        # `identity=`), and that file is a TOML `.aster-identity` (which is
1385        # what `aster enroll node` produces), reach into the same file for
1386        # the [node] secret_key. Otherwise the QUIC endpoint id we generate
1387        # at startup won't match the one baked into the credential and the
1388        # server rejects admission with no useful error.
1389        #
1390        # This makes `enrollment_credential_file=` and `identity=` do the
1391        # same thing for the same TOML file -- mirrors how the TS binding's
1392        # AsterClientWrapper now treats `enrollmentCredentialFile`.
1393        cred_file_for_identity = enrollment_credential_file or config.enrollment_credential_file
1394        if (
1395            not secret_key_from_identity
1396            and not peer_entry
1397            and cred_file_for_identity
1398            and os.path.exists(os.path.expanduser(cred_file_for_identity))
1399        ):
1400            try:
1401                paired_secret, paired_peer = config.load_identity_from_path(
1402                    os.path.expanduser(cred_file_for_identity),
1403                    peer_name=peer,
1404                    role="consumer",
1405                )
1406                if paired_secret:
1407                    secret_key_from_identity = paired_secret
1408                if paired_peer:
1409                    peer_entry = paired_peer
1410            except Exception:
1411                # Best-effort: if the .cred isn't a TOML identity file
1412                # (e.g. it's a flat JSON credential), fall through to the
1413                # existing _load_enrollment_credential path which handles
1414                # both formats. The user just won't get the secret key
1415                # auto-loaded -- they'll need to pass `identity=` too.
1416                pass
1417
1418        if secret_key_from_identity and not config.secret_key:
1419            config.secret_key = secret_key_from_identity
1420        if peer_entry and not root_pubkey:
1421            root_pubkey = bytes.fromhex(peer_entry["root_pubkey"])
1422
1423        # Resolve endpoint address: address > endpoint_addr > config > error.
1424        addr = address or endpoint_addr or config.endpoint_addr
1425        if addr is None:
1426            raise ValueError(
1427                "AsterClient requires an endpoint address. "
1428                "Set ASTER_ENDPOINT_ADDR or pass endpoint_addr= explicitly."
1429            )
1430        self._endpoint_addr_in = addr
1431
1432        # Root pubkey (for optional response validation).
1433        pub = config.resolve_root_pubkey()
1434        self._root_pubkey = root_pubkey if root_pubkey is not None else pub
1435
1436        # Enrollment credential: identity file peer > inline > config > None.
1437        if peer_entry and not enrollment_credential_file:
1438            # The peer entry IS the credential -- write it to a temp file
1439            # that _load_enrollment_credential can read, or inline it.
1440            self._inline_credential = peer_entry
1441            self._enrollment_credential_file = None
1442        else:
1443            self._inline_credential = None
1444            self._enrollment_credential_file = (
1445                enrollment_credential_file or config.enrollment_credential_file
1446            )
1447        self._enrollment_credential_iid = config.enrollment_credential_iid
1448        self._channel_name = channel_name
1449
1450        self._node: Any | None = None
1451        self._ep: Any | None = None
1452        self._services: list[ServiceSummary] = []
1453        self._registry_namespace: str = ""
1454        self._gossip_topic: str = ""
1455        self._open_gate: bool = False
1456        self._rpc_conns: dict[str, Any] = {}
1457        self._clients: list[ServiceClient] = []
1458        self._connected: bool = False
1459        self._closed: bool = False
1460        self._reconnect_attempts: int = 0
1461        self._max_reconnect_attempts: int = 5
1462        self._reconnect_base_delay: float = 1.0  # seconds
1463
1464    async def connect(self) -> None:
1465        """Create endpoint, run admission if credential present, store services.
1466
1467        Idempotent -- second call is a no-op.
1468        """
1469        if self._connected:
1470            return
1471
1472        # Build a full IrohNode so the consumer can join registry docs
1473        # and fetch blobs. Use persistent storage when configured -- this
1474        # preserves the node identity, joined docs, and downloaded blobs
1475        # across restarts.
1476        ep_cfg = self._config.to_endpoint_config()
1477        ep_config = _clone_config_with_alpns(
1478            ep_cfg, [ALPN_CONSUMER_ADMISSION, RPC_ALPN]
1479        )
1480        storage = self._config.storage_path
1481        if storage:
1482            self._node = await IrohNode.persistent_with_alpns(
1483                storage, [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config
1484            )
1485            logger.debug("Consumer node: persistent at %s", storage)
1486        else:
1487            self._node = await IrohNode.memory_with_alpns(
1488                [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config
1489            )
1490            logger.debug("Consumer node: in-memory (set ASTER_STORAGE_PATH for persistence)")
1491        self._ep = net_client(self._node)
1492
1493        logger.debug(
1494            "Consumer node ready: endpoint_id=%s",
1495            self._node.node_addr_info().endpoint_id[:16] + "…",
1496        )
1497
1498        # Always run the admission handshake -- even when the consumer gate
1499        # is open, the response carries the services list + registry ticket.
1500        await self._run_admission()
1501        self._connected = True
1502        self._reconnect_attempts = 0
1503
1504    async def reconnect(self) -> None:
1505        """Reconnect after a connection drop.
1506
1507        Closes stale connections, re-runs admission, and rebuilds the
1508        services list. Uses exponential backoff on repeated failures.
1509        """
1510        self._rpc_conns.clear()
1511        self._clients.clear()
1512        self._connected = False
1513
1514        for attempt in range(self._max_reconnect_attempts):
1515            try:
1516                delay = self._reconnect_base_delay * (2 ** attempt)
1517                if attempt > 0:
1518                    logger.info(
1519                        "Reconnect attempt %d/%d (delay %.1fs)",
1520                        attempt + 1, self._max_reconnect_attempts, delay,
1521                    )
1522                    await asyncio.sleep(delay)
1523
1524                await self._run_admission()
1525                self._connected = True
1526                self._reconnect_attempts = 0
1527                logger.info("Reconnected successfully")
1528                return
1529
1530            except Exception as exc:
1531                logger.warning("Reconnect attempt %d failed: %s", attempt + 1, exc)
1532
1533        raise ConnectionError(
1534            f"Failed to reconnect after {self._max_reconnect_attempts} attempts"
1535        )
1536
1537    async def _run_admission(self) -> None:
1538        """Connect via ``aster.consumer_admission`` to get services.
1539
1540        If an enrollment credential is configured, it's presented for
1541        verification.  If not (dev mode / open gate), an empty credential
1542        is sent -- the producer auto-admits when ``allow_all_consumers=True``.
1543        """
1544        assert self._ep is not None
1545
1546        # Build credential from: inline peer entry > credential file > empty.
1547        credential_file: str | None = None
1548        if self._inline_credential:
1549            cred = _credential_from_peer_entry(self._inline_credential)
1550            cred_json = consumer_cred_to_json(cred)
1551            credential_file = "<inline .aster-identity peer entry>"
1552        elif self._enrollment_credential_file:
1553            cred = _load_enrollment_credential(self._enrollment_credential_file)
1554            cred_json = consumer_cred_to_json(cred)
1555            credential_file = self._enrollment_credential_file
1556        else:
1557            # No credential -- dev mode / open-gate flow.
1558            cred_json = ""
1559
1560        iid_token = self._enrollment_credential_iid or ""
1561
1562        target = _coerce_node_addr(self._endpoint_addr_in)
1563        conn = await self._ep.connect_node_addr(target, ALPN_CONSUMER_ADMISSION)
1564        send, recv = await conn.open_bi()
1565        req = ConsumerAdmissionRequest(
1566            credential_json=cred_json,
1567            iid_token=iid_token,
1568        )
1569        await send.write_all(req.to_json().encode())
1570        await send.finish()
1571        raw = await recv.read_to_end(64 * 1024)
1572        resp = ConsumerAdmissionResponse.from_json(raw)
1573        if not resp.admitted:
1574            our_endpoint_id = ""
1575            try:
1576                if self._node is not None:
1577                    our_endpoint_id = self._node.node_addr_info().endpoint_id
1578            except Exception:
1579                pass
1580            raise AdmissionDeniedError(
1581                had_credential=bool(cred_json),
1582                credential_file=credential_file,
1583                our_endpoint_id=our_endpoint_id,
1584                server_address=str(self._endpoint_addr_in),
1585            )
1586
1587        # If the admission succeeded without us presenting a credential,
1588        # the server must be running in open-gate mode (allow_all_consumers).
1589        # The shell uses this to suppress noisy "Identity not configured"
1590        # banners that are irrelevant on open-gate servers.
1591        self._open_gate = not cred_json
1592
1593        self._services = list(resp.services)
1594        self._registry_namespace = resp.registry_namespace or ""
1595        self._gossip_topic = resp.gossip_topic or ""
1596        logger.info(
1597            "Admitted -- services: %s, registry_namespace: %s, gossip_topic: %s",
1598            [s.name for s in self._services],
1599            bool(self._registry_namespace),
1600            bool(self._gossip_topic),
1601        )
1602
1603    async def _rpc_conn_for(self, rpc_addr_b64: str) -> Any:
1604        if rpc_addr_b64 in self._rpc_conns:
1605            return self._rpc_conns[rpc_addr_b64]
1606        assert self._ep is not None
1607        rpc_addr = _coerce_node_addr(rpc_addr_b64)
1608        conn = await self._ep.connect_node_addr(rpc_addr, RPC_ALPN)
1609        self._rpc_conns[rpc_addr_b64] = conn
1610        return conn
1611
1612    async def client(
1613        self,
1614        service_cls: type,
1615        *,
1616        channel: str | None = None,
1617        codec: Any | None = None,
1618        interceptors: list[Any] | None = None,
1619    ) -> ServiceClient:
1620        """Return an RPC client for ``service_cls``."""
1621        if not self._connected:
1622            raise RuntimeError("AsterClient not connected; call connect() first")
1623
1624        info = getattr(service_cls, "__aster_service_info__", None)
1625        if info is None:
1626            raise TypeError(
1627                f"{service_cls!r} is not @service-decorated "
1628                f"(missing __aster_service_info__)"
1629            )
1630
1631        summary: ServiceSummary | None = None
1632        for s in self._services:
1633            if s.name == info.name and s.version == info.version:
1634                summary = s
1635                break
1636        if summary is None:
1637            raise LookupError(
1638                f"service {info.name!r} v{info.version} not offered by producer "
1639                f"(got: {[(s.name, s.version) for s in self._services]})"
1640            )
1641
1642        channel_key = channel or self._channel_name
1643        if channel_key not in summary.channels:
1644            raise LookupError(
1645                f"service {info.name!r} has no channel {channel_key!r} "
1646                f"(available: {list(summary.channels)})"
1647            )
1648
1649        conn = await self._rpc_conn_for(summary.channels[channel_key])
1650
1651        # If the server advertises JSON-only (e.g. the TypeScript binding,
1652        # whose Fory implementation is not yet XLANG-compliant), pick the
1653        # JSON proxy codec automatically -- otherwise the typed Fory client
1654        # would send Fory bytes the server can't decode and the call would
1655        # fail with an opaque "Expected RpcStatus, got NoneType".
1656        if codec is None:
1657            modes = list(getattr(summary, "serialization_modes", None) or [])
1658            if modes and "xlang" not in modes and "json" in modes:
1659                from aster.json_codec import JsonProxyCodec
1660                codec = JsonProxyCodec()
1661
1662        # Session-scoped services use the session protocol (one bidi stream
1663        # with method multiplexing). Dispatch to create_session.
1664        if info.scoped == RpcScope.SESSION:
1665            from aster.session import create_session
1666            client = await create_session(
1667                service_cls,
1668                connection=conn,
1669                codec=codec,
1670                interceptors=interceptors,
1671            )
1672        else:
1673            client = create_client(
1674                service_cls,
1675                connection=conn,
1676                codec=codec,
1677                interceptors=interceptors,
1678            )
1679        self._clients.append(client)
1680        return client
1681
1682    async def close(self) -> None:
1683        if self._closed:
1684            return
1685        self._closed = True
1686
1687        for c in self._clients:
1688            try:
1689                await c.close()
1690            except Exception:
1691                pass
1692        self._clients.clear()
1693        self._rpc_conns.clear()
1694
1695        if self._node is not None:
1696            try:
1697                await self._node.shutdown()
1698            except Exception:
1699                pass
1700        elif self._ep is not None:
1701            try:
1702                await self._ep.close()
1703            except Exception:
1704                pass
1705
1706    async def __aenter__(self) -> "AsterClient":
1707        await self.connect()
1708        return self
1709
1710    async def __aexit__(self, exc_type, exc, tb) -> None:
1711        await self.close()
1712
1713    @property
1714    def services(self) -> list[ServiceSummary]:
1715        return list(self._services)
1716
1717    @property
1718    def registry_namespace(self) -> str:
1719        """64-char hex namespace_id for the registry doc.
1720
1721        Empty string if no registry doc was provided by the producer.
1722        """
1723        return self._registry_namespace
1724
1725    @property
1726    def open_gate(self) -> bool:
1727        """True if the server admitted this client without a credential.
1728
1729        When True, the server is running with ``allow_all_consumers=True``
1730        and no credential was required. Useful for clients (e.g. the shell)
1731        that want to suppress identity-related banners that are irrelevant
1732        on open-gate servers.
1733        """
1734        return self._open_gate
1735
1736    def proxy(self, service_name: str) -> "ProxyClient":
1737        """Create a dynamic proxy client for a shared (stream-per-call) service.
1738
1739        The proxy discovers methods from the service contract and builds
1740        method stubs at runtime. No local type definitions needed -- call
1741        methods with dicts and receive dicts back::
1742
1743            mc = client.proxy("MissionControl")
1744            result = await mc.getStatus({"agent_id": "edge-1"})
1745            print(result["status"])
1746
1747        For session-scoped services, use :meth:`session` instead.
1748
1749        Args:
1750            service_name: The service name (e.g., ``"MissionControl"``).
1751
1752        Returns:
1753            A :class:`ProxyClient` with method stubs for each RPC method.
1754
1755        Raises:
1756            TypeError: If the service is session-scoped (use ``session()``).
1757        """
1758        if not self._connected:
1759            raise RuntimeError("AsterClient not connected; call connect() first")
1760
1761        summary = self._find_service(service_name)
1762
1763        if getattr(summary, "pattern", "shared") == "session":
1764            raise TypeError(
1765                f"'{service_name}' is session-scoped. "
1766                f"Use 'await client.session(\"{service_name}\")' instead of "
1767                f"'client.proxy(\"{service_name}\")'."
1768            )
1769
1770        return ProxyClient(service_name=service_name, aster_client=self)
1771
1772    async def session(self, service_name: str) -> "SessionProxyClient":
1773        """Create a dynamic proxy client for a session-scoped service.
1774
1775        Opens a single bidirectional QUIC stream and multiplexes calls
1776        over it. Maintains a lock to ensure one call in flight at a time
1777        (spec requirement). Call methods with dicts, receive dicts::
1778
1779            agent = await client.session("AgentSession")
1780            result = await agent.register({"agent_id": "edge-1"})
1781            print(result["assignment"])
1782            await agent.close()
1783
1784        For shared (stream-per-call) services, use :meth:`proxy` instead.
1785
1786        Args:
1787            service_name: The service name (e.g., ``"AgentSession"``).
1788
1789        Returns:
1790            A session proxy with method stubs. Must be closed when done.
1791        """
1792        if not self._connected:
1793            raise RuntimeError("AsterClient not connected; call connect() first")
1794
1795        summary = self._find_service(service_name)
1796
1797        channel_key = self._channel_name
1798        if channel_key not in summary.channels:
1799            channel_key = next(iter(summary.channels), self._channel_name)
1800
1801        conn = await self._rpc_conn_for(summary.channels.get(channel_key, ""))
1802
1803        codec = None
1804        modes = list(getattr(summary, "serialization_modes", None) or [])
1805        if modes and "xlang" not in modes and "json" in modes:
1806            from aster.json_codec import JsonProxyCodec
1807            codec = JsonProxyCodec()
1808
1809        from aster.session import create_proxy_session
1810        session_client = await create_proxy_session(
1811            service_name=service_name,
1812            connection=conn,
1813            codec=codec,
1814            aster_client=self,
1815        )
1816        self._clients.append(session_client)
1817        return session_client
1818
1819    def _find_service(self, service_name: str) -> "ServiceSummary":
1820        """Look up a service by name in the admission response."""
1821        for s in self._services:
1822            if s.name == service_name:
1823                return s
1824        available = [s.name for s in self._services]
1825        raise ValueError(
1826            f"Service '{service_name}' not found. "
1827            f"Available: {available}"
1828        )
1829
1830    @property
1831    def gossip_topic(self) -> str:
1832        """Hex-encoded 32-byte gossip topic ID for the producer mesh.
1833
1834        Only populated when the connecting consumer is the root node
1835        (endpoint_id == root_pubkey). Empty string otherwise.
1836        """
1837        return self._gossip_topic

High-level, declarative consumer.

Reads configuration from AsterConfig (env vars / TOML file) just like AsterServer. In dev mode (no credentials, ephemeral producer), AsterClient() with just ASTER_ENDPOINT_ADDR set is enough. In production, set ASTER_ENROLLMENT_CREDENTIAL to the path of a pre-signed token from the operator.

endpoint_addr may be a base64 NodeAddr string (as printed by AsterServer), an EndpointId hex string (when discovery is enabled), a NodeAddr object, or raw NodeAddr.to_bytes() bytes.

AsterClient( *, config: aster.config.AsterConfig | None = None, peer: str | None = None, identity: str | None = None, address: str | None = None, endpoint_addr: NodeAddr | str | bytes | None = None, root_pubkey: bytes | None = None, enrollment_credential_file: str | None = None, channel_name: str = 'rpc')
1322    def __init__(
1323        self,
1324        *,
1325        config: "AsterConfig | None" = None,
1326        peer: str | None = None,
1327        identity: str | None = None,
1328        # Connection address (aster1... ticket, base64 NodeAddr, or hex EndpointId):
1329        address: str | None = None,
1330        # Back-compat alias for address:
1331        endpoint_addr: NodeAddr | str | bytes | None = None,
1332        root_pubkey: bytes | None = None,
1333        enrollment_credential_file: str | None = None,
1334        # Internal wiring:
1335        channel_name: str = "rpc",
1336    ) -> None:
1337        """Create an Aster RPC client.
1338
1339        Args:
1340            config: Optional :class:`AsterConfig`. If omitted, settings are
1341                loaded from environment variables.
1342            peer: Peer name for identity file lookup.
1343            identity: Path to ``.aster-identity`` file (default: auto-detected
1344                from CWD). Overrides ``config.identity_file``.
1345            address: The server's address. Accepts:
1346                - ``aster1...`` compact ticket (recommended)
1347                - Base64-encoded ``NodeAddr``
1348                - Hex ``EndpointId`` (requires discovery)
1349            endpoint_addr: Alias for *address* (back-compat).
1350            root_pubkey: 32-byte ed25519 public key of the server's trust
1351                anchor. Required for credential-based admission.
1352            enrollment_credential_file: Path to a pre-signed enrollment
1353                credential (``.cred`` file from ``aster enroll``).
1354
1355        Example::
1356
1357            # Dev mode -- open gate, no credentials
1358            client = AsterClient(address="aster1...")
1359            await client.connect()
1360
1361            # Production -- with credential
1362            client = AsterClient(
1363                address="aster1...",
1364                root_pubkey=pub_key,
1365                enrollment_credential_file="my-agent.cred",
1366            )
1367            await client.connect()
1368        """
1369        from .config import AsterConfig
1370
1371        if config is None:
1372            config = AsterConfig.from_env()
1373        if identity is not None:
1374            config.identity_file = identity
1375        self._config = config
1376        self._peer_name = peer
1377
1378        # Load identity file if present (.aster-identity).
1379        secret_key_from_identity, peer_entry = config.load_identity(
1380            peer_name=peer, role="consumer"
1381        )
1382
1383        # If the user only passed `enrollment_credential_file` (no separate
1384        # `identity=`), and that file is a TOML `.aster-identity` (which is
1385        # what `aster enroll node` produces), reach into the same file for
1386        # the [node] secret_key. Otherwise the QUIC endpoint id we generate
1387        # at startup won't match the one baked into the credential and the
1388        # server rejects admission with no useful error.
1389        #
1390        # This makes `enrollment_credential_file=` and `identity=` do the
1391        # same thing for the same TOML file -- mirrors how the TS binding's
1392        # AsterClientWrapper now treats `enrollmentCredentialFile`.
1393        cred_file_for_identity = enrollment_credential_file or config.enrollment_credential_file
1394        if (
1395            not secret_key_from_identity
1396            and not peer_entry
1397            and cred_file_for_identity
1398            and os.path.exists(os.path.expanduser(cred_file_for_identity))
1399        ):
1400            try:
1401                paired_secret, paired_peer = config.load_identity_from_path(
1402                    os.path.expanduser(cred_file_for_identity),
1403                    peer_name=peer,
1404                    role="consumer",
1405                )
1406                if paired_secret:
1407                    secret_key_from_identity = paired_secret
1408                if paired_peer:
1409                    peer_entry = paired_peer
1410            except Exception:
1411                # Best-effort: if the .cred isn't a TOML identity file
1412                # (e.g. it's a flat JSON credential), fall through to the
1413                # existing _load_enrollment_credential path which handles
1414                # both formats. The user just won't get the secret key
1415                # auto-loaded -- they'll need to pass `identity=` too.
1416                pass
1417
1418        if secret_key_from_identity and not config.secret_key:
1419            config.secret_key = secret_key_from_identity
1420        if peer_entry and not root_pubkey:
1421            root_pubkey = bytes.fromhex(peer_entry["root_pubkey"])
1422
1423        # Resolve endpoint address: address > endpoint_addr > config > error.
1424        addr = address or endpoint_addr or config.endpoint_addr
1425        if addr is None:
1426            raise ValueError(
1427                "AsterClient requires an endpoint address. "
1428                "Set ASTER_ENDPOINT_ADDR or pass endpoint_addr= explicitly."
1429            )
1430        self._endpoint_addr_in = addr
1431
1432        # Root pubkey (for optional response validation).
1433        pub = config.resolve_root_pubkey()
1434        self._root_pubkey = root_pubkey if root_pubkey is not None else pub
1435
1436        # Enrollment credential: identity file peer > inline > config > None.
1437        if peer_entry and not enrollment_credential_file:
1438            # The peer entry IS the credential -- write it to a temp file
1439            # that _load_enrollment_credential can read, or inline it.
1440            self._inline_credential = peer_entry
1441            self._enrollment_credential_file = None
1442        else:
1443            self._inline_credential = None
1444            self._enrollment_credential_file = (
1445                enrollment_credential_file or config.enrollment_credential_file
1446            )
1447        self._enrollment_credential_iid = config.enrollment_credential_iid
1448        self._channel_name = channel_name
1449
1450        self._node: Any | None = None
1451        self._ep: Any | None = None
1452        self._services: list[ServiceSummary] = []
1453        self._registry_namespace: str = ""
1454        self._gossip_topic: str = ""
1455        self._open_gate: bool = False
1456        self._rpc_conns: dict[str, Any] = {}
1457        self._clients: list[ServiceClient] = []
1458        self._connected: bool = False
1459        self._closed: bool = False
1460        self._reconnect_attempts: int = 0
1461        self._max_reconnect_attempts: int = 5
1462        self._reconnect_base_delay: float = 1.0  # seconds

Create an Aster RPC client.

Args: config: Optional AsterConfig. If omitted, settings are loaded from environment variables. peer: Peer name for identity file lookup. identity: Path to .aster-identity file (default: auto-detected from CWD). Overrides config.identity_file. address: The server's address. Accepts: - aster1... compact ticket (recommended) - Base64-encoded NodeAddr - Hex EndpointId (requires discovery) endpoint_addr: Alias for address (back-compat). root_pubkey: 32-byte ed25519 public key of the server's trust anchor. Required for credential-based admission. enrollment_credential_file: Path to a pre-signed enrollment credential (.cred file from aster enroll).

Example::

# Dev mode -- open gate, no credentials
client = AsterClient(address="aster1...")
await client.connect()

# Production -- with credential
client = AsterClient(
    address="aster1...",
    root_pubkey=pub_key,
    enrollment_credential_file="my-agent.cred",
)
await client.connect()
async def connect(self) -> None:
1464    async def connect(self) -> None:
1465        """Create endpoint, run admission if credential present, store services.
1466
1467        Idempotent -- second call is a no-op.
1468        """
1469        if self._connected:
1470            return
1471
1472        # Build a full IrohNode so the consumer can join registry docs
1473        # and fetch blobs. Use persistent storage when configured -- this
1474        # preserves the node identity, joined docs, and downloaded blobs
1475        # across restarts.
1476        ep_cfg = self._config.to_endpoint_config()
1477        ep_config = _clone_config_with_alpns(
1478            ep_cfg, [ALPN_CONSUMER_ADMISSION, RPC_ALPN]
1479        )
1480        storage = self._config.storage_path
1481        if storage:
1482            self._node = await IrohNode.persistent_with_alpns(
1483                storage, [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config
1484            )
1485            logger.debug("Consumer node: persistent at %s", storage)
1486        else:
1487            self._node = await IrohNode.memory_with_alpns(
1488                [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config
1489            )
1490            logger.debug("Consumer node: in-memory (set ASTER_STORAGE_PATH for persistence)")
1491        self._ep = net_client(self._node)
1492
1493        logger.debug(
1494            "Consumer node ready: endpoint_id=%s",
1495            self._node.node_addr_info().endpoint_id[:16] + "…",
1496        )
1497
1498        # Always run the admission handshake -- even when the consumer gate
1499        # is open, the response carries the services list + registry ticket.
1500        await self._run_admission()
1501        self._connected = True
1502        self._reconnect_attempts = 0

Create endpoint, run admission if credential present, store services.

Idempotent -- second call is a no-op.

async def reconnect(self) -> None:
1504    async def reconnect(self) -> None:
1505        """Reconnect after a connection drop.
1506
1507        Closes stale connections, re-runs admission, and rebuilds the
1508        services list. Uses exponential backoff on repeated failures.
1509        """
1510        self._rpc_conns.clear()
1511        self._clients.clear()
1512        self._connected = False
1513
1514        for attempt in range(self._max_reconnect_attempts):
1515            try:
1516                delay = self._reconnect_base_delay * (2 ** attempt)
1517                if attempt > 0:
1518                    logger.info(
1519                        "Reconnect attempt %d/%d (delay %.1fs)",
1520                        attempt + 1, self._max_reconnect_attempts, delay,
1521                    )
1522                    await asyncio.sleep(delay)
1523
1524                await self._run_admission()
1525                self._connected = True
1526                self._reconnect_attempts = 0
1527                logger.info("Reconnected successfully")
1528                return
1529
1530            except Exception as exc:
1531                logger.warning("Reconnect attempt %d failed: %s", attempt + 1, exc)
1532
1533        raise ConnectionError(
1534            f"Failed to reconnect after {self._max_reconnect_attempts} attempts"
1535        )

Reconnect after a connection drop.

Closes stale connections, re-runs admission, and rebuilds the services list. Uses exponential backoff on repeated failures.

async def client( self, service_cls: type, *, channel: str | None = None, codec: typing.Any | None = None, interceptors: list[typing.Any] | None = None) -> aster.client.ServiceClient:
1612    async def client(
1613        self,
1614        service_cls: type,
1615        *,
1616        channel: str | None = None,
1617        codec: Any | None = None,
1618        interceptors: list[Any] | None = None,
1619    ) -> ServiceClient:
1620        """Return an RPC client for ``service_cls``."""
1621        if not self._connected:
1622            raise RuntimeError("AsterClient not connected; call connect() first")
1623
1624        info = getattr(service_cls, "__aster_service_info__", None)
1625        if info is None:
1626            raise TypeError(
1627                f"{service_cls!r} is not @service-decorated "
1628                f"(missing __aster_service_info__)"
1629            )
1630
1631        summary: ServiceSummary | None = None
1632        for s in self._services:
1633            if s.name == info.name and s.version == info.version:
1634                summary = s
1635                break
1636        if summary is None:
1637            raise LookupError(
1638                f"service {info.name!r} v{info.version} not offered by producer "
1639                f"(got: {[(s.name, s.version) for s in self._services]})"
1640            )
1641
1642        channel_key = channel or self._channel_name
1643        if channel_key not in summary.channels:
1644            raise LookupError(
1645                f"service {info.name!r} has no channel {channel_key!r} "
1646                f"(available: {list(summary.channels)})"
1647            )
1648
1649        conn = await self._rpc_conn_for(summary.channels[channel_key])
1650
1651        # If the server advertises JSON-only (e.g. the TypeScript binding,
1652        # whose Fory implementation is not yet XLANG-compliant), pick the
1653        # JSON proxy codec automatically -- otherwise the typed Fory client
1654        # would send Fory bytes the server can't decode and the call would
1655        # fail with an opaque "Expected RpcStatus, got NoneType".
1656        if codec is None:
1657            modes = list(getattr(summary, "serialization_modes", None) or [])
1658            if modes and "xlang" not in modes and "json" in modes:
1659                from aster.json_codec import JsonProxyCodec
1660                codec = JsonProxyCodec()
1661
1662        # Session-scoped services use the session protocol (one bidi stream
1663        # with method multiplexing). Dispatch to create_session.
1664        if info.scoped == RpcScope.SESSION:
1665            from aster.session import create_session
1666            client = await create_session(
1667                service_cls,
1668                connection=conn,
1669                codec=codec,
1670                interceptors=interceptors,
1671            )
1672        else:
1673            client = create_client(
1674                service_cls,
1675                connection=conn,
1676                codec=codec,
1677                interceptors=interceptors,
1678            )
1679        self._clients.append(client)
1680        return client

Return an RPC client for service_cls.

registry_namespace: str
1717    @property
1718    def registry_namespace(self) -> str:
1719        """64-char hex namespace_id for the registry doc.
1720
1721        Empty string if no registry doc was provided by the producer.
1722        """
1723        return self._registry_namespace

64-char hex namespace_id for the registry doc.

Empty string if no registry doc was provided by the producer.

open_gate: bool
1725    @property
1726    def open_gate(self) -> bool:
1727        """True if the server admitted this client without a credential.
1728
1729        When True, the server is running with ``allow_all_consumers=True``
1730        and no credential was required. Useful for clients (e.g. the shell)
1731        that want to suppress identity-related banners that are irrelevant
1732        on open-gate servers.
1733        """
1734        return self._open_gate

True if the server admitted this client without a credential.

When True, the server is running with allow_all_consumers=True and no credential was required. Useful for clients (e.g. the shell) that want to suppress identity-related banners that are irrelevant on open-gate servers.

def proxy(self, service_name: str) -> aster.runtime.ProxyClient:
1736    def proxy(self, service_name: str) -> "ProxyClient":
1737        """Create a dynamic proxy client for a shared (stream-per-call) service.
1738
1739        The proxy discovers methods from the service contract and builds
1740        method stubs at runtime. No local type definitions needed -- call
1741        methods with dicts and receive dicts back::
1742
1743            mc = client.proxy("MissionControl")
1744            result = await mc.getStatus({"agent_id": "edge-1"})
1745            print(result["status"])
1746
1747        For session-scoped services, use :meth:`session` instead.
1748
1749        Args:
1750            service_name: The service name (e.g., ``"MissionControl"``).
1751
1752        Returns:
1753            A :class:`ProxyClient` with method stubs for each RPC method.
1754
1755        Raises:
1756            TypeError: If the service is session-scoped (use ``session()``).
1757        """
1758        if not self._connected:
1759            raise RuntimeError("AsterClient not connected; call connect() first")
1760
1761        summary = self._find_service(service_name)
1762
1763        if getattr(summary, "pattern", "shared") == "session":
1764            raise TypeError(
1765                f"'{service_name}' is session-scoped. "
1766                f"Use 'await client.session(\"{service_name}\")' instead of "
1767                f"'client.proxy(\"{service_name}\")'."
1768            )
1769
1770        return ProxyClient(service_name=service_name, aster_client=self)

Create a dynamic proxy client for a shared (stream-per-call) service.

The proxy discovers methods from the service contract and builds method stubs at runtime. No local type definitions needed -- call methods with dicts and receive dicts back::

mc = client.proxy("MissionControl")
result = await mc.getStatus({"agent_id": "edge-1"})
print(result["status"])

For session-scoped services, use session() instead.

Args: service_name: The service name (e.g., "MissionControl").

Returns: A ProxyClient with method stubs for each RPC method.

Raises: TypeError: If the service is session-scoped (use session()).

async def session(self, service_name: str) -> "'SessionProxyClient'":
1772    async def session(self, service_name: str) -> "SessionProxyClient":
1773        """Create a dynamic proxy client for a session-scoped service.
1774
1775        Opens a single bidirectional QUIC stream and multiplexes calls
1776        over it. Maintains a lock to ensure one call in flight at a time
1777        (spec requirement). Call methods with dicts, receive dicts::
1778
1779            agent = await client.session("AgentSession")
1780            result = await agent.register({"agent_id": "edge-1"})
1781            print(result["assignment"])
1782            await agent.close()
1783
1784        For shared (stream-per-call) services, use :meth:`proxy` instead.
1785
1786        Args:
1787            service_name: The service name (e.g., ``"AgentSession"``).
1788
1789        Returns:
1790            A session proxy with method stubs. Must be closed when done.
1791        """
1792        if not self._connected:
1793            raise RuntimeError("AsterClient not connected; call connect() first")
1794
1795        summary = self._find_service(service_name)
1796
1797        channel_key = self._channel_name
1798        if channel_key not in summary.channels:
1799            channel_key = next(iter(summary.channels), self._channel_name)
1800
1801        conn = await self._rpc_conn_for(summary.channels.get(channel_key, ""))
1802
1803        codec = None
1804        modes = list(getattr(summary, "serialization_modes", None) or [])
1805        if modes and "xlang" not in modes and "json" in modes:
1806            from aster.json_codec import JsonProxyCodec
1807            codec = JsonProxyCodec()
1808
1809        from aster.session import create_proxy_session
1810        session_client = await create_proxy_session(
1811            service_name=service_name,
1812            connection=conn,
1813            codec=codec,
1814            aster_client=self,
1815        )
1816        self._clients.append(session_client)
1817        return session_client

Create a dynamic proxy client for a session-scoped service.

Opens a single bidirectional QUIC stream and multiplexes calls over it. Maintains a lock to ensure one call in flight at a time (spec requirement). Call methods with dicts, receive dicts::

agent = await client.session("AgentSession")
result = await agent.register({"agent_id": "edge-1"})
print(result["assignment"])
await agent.close()

For shared (stream-per-call) services, use proxy() instead.

Args: service_name: The service name (e.g., "AgentSession").

Returns: A session proxy with method stubs. Must be closed when done.

gossip_topic: str
1830    @property
1831    def gossip_topic(self) -> str:
1832        """Hex-encoded 32-byte gossip topic ID for the producer mesh.
1833
1834        Only populated when the connecting consumer is the root node
1835        (endpoint_id == root_pubkey). Empty string otherwise.
1836        """
1837        return self._gossip_topic

Hex-encoded 32-byte gossip topic ID for the producer mesh.

Only populated when the connecting consumer is the root node (endpoint_id == root_pubkey). Empty string otherwise.

@dataclass
class AsterConfig:
276@dataclass
277class AsterConfig:
278    """Unified configuration for :class:`AsterServer`.
279
280    Combines trust (root public key, admission policy), storage (memory vs
281    persistent), and networking (relay, bind address, etc.) into one object.
282
283    **Trust model (Aster-trust-spec.md §1.1):** The root *private* key is
284    offline -- it never touches a running node. Nodes receive only the root
285    *public* key (to verify credentials) and optionally an enrollment
286    credential (a pre-signed token for mesh join). The founding node of a
287    mesh needs no enrollment credential; it bootstraps the accepted-producer
288    set with just its own EndpointId.
289
290    Three ways to get an ``AsterConfig``:
291
292    1. **Auto from env** (default when ``AsterServer`` gets no config)::
293
294           config = AsterConfig.from_env()
295
296    2. **From a TOML file** (with env overrides)::
297
298           config = AsterConfig.from_file("aster.toml")
299
300    3. **Inline** (testing, scripts)::
301
302           config = AsterConfig(root_pubkey=pub)
303    """
304
305    # ── Trust ────────────────────────────────────────────────────────────
306
307    root_pubkey: bytes | None = None
308    """32-byte ed25519 root public key (the deployment trust anchor).
309    Highest priority -- overrides ``root_pubkey_file`` when set."""
310
311    root_pubkey_file: str | None = None
312    """Path to a file containing the root public key.  Accepts either a
313    plain hex string or a JSON object with a ``"public_key"`` field."""
314
315    enrollment_credential_file: str | None = None
316    """Path to a JSON enrollment credential (pre-signed by the offline root
317    key).  Required when a node joins an existing producer mesh.  Not needed
318    for the founding node or for dev/ephemeral mode."""
319
320    allow_all_consumers: bool = False
321    """Skip consumer admission gate. Default: gate consumers."""
322
323    enrollment_credential_iid: str | None = None
324    """Cloud Instance Identity Document token.  Required when the enrollment
325    credential's policy includes ``aster.iid_*`` attributes (AWS/GCP/Azure).
326    Set via ``ASTER_ENROLLMENT_CREDENTIAL_IID``."""
327
328    allow_all_consumers: bool = False
329    """Skip consumer admission gate. Default: gate consumers."""
330
331    allow_all_producers: bool = True
332    """Skip producer admission gate. Default: allow all producers."""
333
334    # ── Connect (consumer-side) ──────────────────────────────────────────
335
336    endpoint_addr: str | None = None
337    """Producer's endpoint address (base64 NodeAddr or EndpointId hex string).
338    The consumer dials this to reach RPC + admission + blobs/docs/gossip.
339    Set via ``ASTER_ENDPOINT_ADDR``."""
340
341    # ── Storage ──────────────────────────────────────────────────────────
342
343    storage_path: str | None = None
344    """If set, use FsStore at this path; otherwise in-memory."""
345
346    # ── Network (forwarded to EndpointConfig) ────────────────────────────
347
348    secret_key: bytes | None = None
349    """32-byte node identity key. Determines the stable ``EndpointId``.
350    If unset, a fresh identity is generated each run (fine for dev)."""
351
352    relay_mode: str | None = None
353    bind_addr: str | None = None
354    enable_monitoring: bool = False
355    enable_hooks: bool = False
356    hook_timeout_ms: int = 5000
357    clear_ip_transports: bool = False
358    clear_relay_transports: bool = False
359    portmapper_config: str | None = None
360    proxy_url: str | None = None
361    proxy_from_env: bool = False
362    local_discovery: bool = False
363    """Enable mDNS local network discovery. Nodes on the same LAN can find
364    each other without relay servers. Default off.
365    Env: ``ASTER_LOCAL_DISCOVERY``. TOML: ``[network] local_discovery``."""
366
367    # ── Logging / observability ─────────────────────────────────────────
368
369    log_format: str = "text"
370    """Log output format: ``"json"`` for structured (k8s/ELK) or ``"text"`` for dev.
371    Env: ``ASTER_LOG_FORMAT``. TOML: ``[logging] format``."""
372
373    log_level: str = "info"
374    """Log level: ``"debug"``, ``"info"``, ``"warning"``, ``"error"``.
375    Env: ``ASTER_LOG_LEVEL``. TOML: ``[logging] level``."""
376
377    log_mask: bool = True
378    """Mask sensitive fields in logs (keys, credentials, endpoint IDs).
379    Env: ``ASTER_LOG_MASK``. TOML: ``[logging] mask``."""
380
381    # ── Identity file ────────────────────────────────────────────────────
382
383    identity_file: str | None = None
384    """Path to ``.aster-identity`` TOML file.  When set, the node key and
385    enrollment credentials are loaded from this file.
386    Env: ``ASTER_IDENTITY_FILE``.  Default: looks for ``.aster-identity``
387    in the current working directory."""
388
389    # ── Internal (not user-facing) ───────────────────────────────────────
390
391    _sources: dict = dc_field(default_factory=dict, repr=False)
392    """Provenance tracker: maps field name → source string
393    (e.g. ``"ASTER_ROOT_PUBKEY_FILE"``, ``"aster.toml [trust]"``, ``"default"``)."""
394
395    _ephemeral_privkey: bytes | None = dc_field(default=None, repr=False)
396    """Transient private key generated in dev mode.  Used only by
397    ``simple_consumer`` to auto-mint a credential.  Never persisted,
398    never configurable.  ``None`` in production."""
399
400    # ── Resolve ──────────────────────────────────────────────────────────
401
402    def resolve_root_pubkey(self) -> bytes | None:
403        """Return the root public key, resolving from config sources.
404
405        Resolution order:
406
407        1. Inline ``root_pubkey`` (highest priority).
408        2. ``root_pubkey_file`` (hex string or JSON with ``public_key``).
409        3. Generate an ephemeral keypair (dev mode only -- logged).
410
411        The root *private* key never appears here. In dev mode, a transient
412        private key is stored on ``_ephemeral_privkey`` so the companion
413        ``simple_consumer`` example can auto-mint a credential; it is never
414        persisted or configurable.
415        """
416        if self.root_pubkey is not None:
417            return self.root_pubkey
418
419        if self.root_pubkey_file:
420            pub = _load_pubkey_from_file(self.root_pubkey_file)
421            if pub is not None:
422                self.root_pubkey = pub
423                return pub
424            _config_logger.warning(
425                "Root pubkey file %s not found or invalid", self.root_pubkey_file
426            )
427
428        # Dev mode: generate ephemeral keypair if admission is needed.
429        if not self.allow_all_consumers or not self.allow_all_producers:
430            from .trust.signing import generate_root_keypair
431
432            priv, pub = generate_root_keypair()
433            self._ephemeral_privkey = priv  # transient, never persisted
434            self.root_pubkey = pub
435            self._sources["root_pubkey"] = "ephemeral (dev mode)"
436            _config_logger.info(
437                "Generated ephemeral root key "
438                "(set ASTER_ROOT_PUBKEY_FILE for production)"
439            )
440            return pub
441
442        return None
443
444    def load_identity(self, peer_name: str | None = None,
445                      role: str | None = None) -> tuple[bytes | None, dict | None]:
446        """Load a peer entry from the ``.aster-identity`` file.
447
448        Args:
449            peer_name: Select peer by ``name``.  If None, auto-selects by
450                ``role`` (first match).
451            role: Fallback selector when ``peer_name`` is None.
452                ``"producer"`` for AsterServer, ``"consumer"`` for AsterClient.
453
454        Returns:
455            ``(secret_key_bytes, peer_dict)`` or ``(None, None)`` when no
456            identity file is found.
457        """
458        import base64 as _b64
459
460        path = self._resolve_identity_path()
461        if path is None:
462            return None, None
463
464        if sys.version_info >= (3, 11):
465            import tomllib as _tl
466        else:
467            import tomli as _tl  # type: ignore[no-redef]
468
469        with open(path, "rb") as f:
470            data = _tl.load(f)
471
472        node = data.get("node", {})
473        secret_key_b64 = node.get("secret_key")
474        secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None
475
476        # Find the peer entry
477        peers = data.get("peers", [])
478        peer = None
479        if peer_name is not None:
480            for p in peers:
481                if p.get("name") == peer_name:
482                    peer = p
483                    break
484        elif role is not None:
485            for p in peers:
486                if p.get("role") == role:
487                    peer = p
488                    break
489        elif peers:
490            peer = peers[0]  # single peer → auto-select
491
492        return secret_key, peer
493
494    def _resolve_identity_path(self) -> str | None:
495        """Find the .aster-identity file."""
496        if self.identity_file:
497            expanded = os.path.expanduser(self.identity_file)
498            return expanded if os.path.exists(expanded) else None
499        # Default: look in cwd
500        default = os.path.join(os.getcwd(), ".aster-identity")
501        return default if os.path.exists(default) else None
502
503    def load_identity_from_path(
504        self,
505        path: str,
506        peer_name: str | None = None,
507        role: str | None = None,
508    ) -> tuple[bytes | None, dict | None]:
509        """Load secret_key + peer entry from a specific TOML identity file.
510
511        Same return shape as :meth:`load_identity` but reads from an
512        explicit path instead of consulting ``self.identity_file``. Used
513        by ``AsterClient`` to fold the secret key out of an
514        ``enrollment_credential_file`` when no separate ``identity=`` was
515        provided -- both options should do the same thing for the same
516        TOML file produced by ``aster enroll node``.
517        """
518        import base64 as _b64
519
520        if sys.version_info >= (3, 11):
521            import tomllib as _tl
522        else:
523            import tomli as _tl  # type: ignore[no-redef]
524
525        with open(path, "rb") as f:
526            data = _tl.load(f)
527
528        node = data.get("node", {})
529        secret_key_b64 = node.get("secret_key")
530        secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None
531
532        peers = data.get("peers", [])
533        peer = None
534        if peer_name is not None:
535            for p in peers:
536                if p.get("name") == peer_name:
537                    peer = p
538                    break
539        elif role is not None:
540            for p in peers:
541                if p.get("role") == role:
542                    peer = p
543                    break
544        elif peers:
545            peer = peers[0]
546
547        return secret_key, peer
548
549    def to_endpoint_config(self) -> EndpointConfig | None:
550        """Build an :class:`EndpointConfig` from the network fields.
551
552        Returns ``None`` when all network fields are at their defaults
553        (so ``IrohNode.memory_with_alpns`` can use the fast
554        ``Endpoint::bind(presets::N0)`` path).
555        """
556        has_custom = any([
557            self.relay_mode, self.secret_key, self.bind_addr,
558            self.enable_monitoring, self.enable_hooks,
559            self.clear_ip_transports, self.clear_relay_transports,
560            self.portmapper_config, self.proxy_url, self.proxy_from_env,
561            self.local_discovery,
562            self.hook_timeout_ms != 5000,
563        ])
564        if not has_custom:
565            return None
566        return EndpointConfig(
567            alpns=[],  # Router sets ALPNs
568            relay_mode=self.relay_mode,
569            secret_key=self.secret_key,
570            enable_monitoring=self.enable_monitoring,
571            enable_hooks=self.enable_hooks,
572            hook_timeout_ms=self.hook_timeout_ms,
573            bind_addr=self.bind_addr,
574            clear_ip_transports=self.clear_ip_transports,
575            clear_relay_transports=self.clear_relay_transports,
576            portmapper_config=self.portmapper_config,
577            proxy_url=self.proxy_url,
578            proxy_from_env=self.proxy_from_env,
579            enable_local_discovery=self.local_discovery,
580        )
581
582    # ── print_config ─────────────────────────────────────────────────────
583
584    def print_config(self, *, json: bool = False) -> str:
585        """Render the resolved configuration with provenance and masking.
586
587        Sensitive fields (``secret_key``, ``enrollment_credential_file``)
588        are masked.  ``root_pubkey`` is public and shown in full.
589
590        Args:
591            json: If True, return a JSON string; otherwise a human-readable table.
592
593        Returns:
594            The rendered config string (also printed to stdout).
595        """
596        sections = {
597            "trust": [
598                ("root_pubkey", self._fmt_bytes(self.root_pubkey, mask=False)),
599                ("root_pubkey_file", self.root_pubkey_file or "<not set>"),
600                ("enrollment_credential_file", self._fmt_masked(self.enrollment_credential_file)),
601                ("enrollment_credential_iid", self._fmt_masked(self.enrollment_credential_iid)),
602                ("allow_all_consumers", self.allow_all_consumers),
603                ("allow_all_producers", self.allow_all_producers),
604            ],
605            "connect": [
606                ("endpoint_addr", self.endpoint_addr or "<not set>"),
607            ],
608            "network": [
609                ("secret_key", self._fmt_bytes(self.secret_key, mask=True)),
610                ("relay_mode", self.relay_mode or "<default>"),
611                ("bind_addr", self.bind_addr or "<any>"),
612                ("enable_monitoring", self.enable_monitoring),
613                ("enable_hooks", self.enable_hooks),
614            ],
615            "storage": [
616                ("path", self.storage_path or "<in-memory>"),
617            ],
618        }
619
620        if json:
621            out: dict = {}
622            for section, fields in sections.items():
623                out[section] = {}
624                for name, value in fields:
625                    source = self._sources.get(name, "default")
626                    out[section][name] = {"value": value, "source": source}
627            text = _json.dumps(out, indent=2, default=str)
628        else:
629            lines = []
630            for section, fields in sections.items():
631                lines.append(f"  [{section}]")
632                for name, value in fields:
633                    source = self._sources.get(name, "default")
634                    lines.append(f"    {name:<28s}: {value!s:<36s} ({source})")
635            text = "\n".join(lines)
636
637        print(text)
638        return text
639
640    @staticmethod
641    def _fmt_bytes(val: bytes | None, *, mask: bool) -> str:
642        if val is None:
643            return "<not set>"
644        h = val.hex()
645        if mask:
646            return f"****...{h[-8:]}" if len(h) > 8 else "****"
647        return h
648
649    @staticmethod
650    def _fmt_masked(val: str | None) -> str:
651        if val is None:
652            return "<not set>"
653        return f"****...{val[-12:]}" if len(val) > 12 else "****"
654
655    # ── Factory methods ──────────────────────────────────────────────────
656
657    @classmethod
658    def from_env(cls) -> "AsterConfig":
659        """Build config from ``ASTER_*`` environment variables only."""
660        return cls._load(toml_data=None, toml_path=None)
661
662    @classmethod
663    def from_file(cls, path: str | Path) -> "AsterConfig":
664        """Build config from a TOML file, with env-var overrides."""
665        with Path(path).open("rb") as fh:
666            raw = tomllib.load(fh)
667        return cls._load(toml_data=raw, toml_path=str(path))
668
669    @classmethod
670    def _load(cls, toml_data: dict | None, toml_path: str | None) -> "AsterConfig":
671        kwargs: dict = {}
672        sources: dict[str, str] = {}
673        env = os.environ
674        toml_label = toml_path or "aster.toml"
675
676        def _set(field: str, value, source: str) -> None:
677            kwargs[field] = value
678            sources[field] = source
679
680        # ── Trust (TOML [trust] section) ─────────────────────────────────
681        trust = (toml_data or {}).get("trust", {})
682        if "root_pubkey" in trust:
683            raw = trust["root_pubkey"]
684            _set("root_pubkey", bytes.fromhex(raw), f"{toml_label} [trust]")
685        if "root_pubkey_file" in trust:
686            _set("root_pubkey_file", str(trust["root_pubkey_file"]), f"{toml_label} [trust]")
687        if "enrollment_credential" in trust:
688            _set("enrollment_credential_file", str(trust["enrollment_credential"]), f"{toml_label} [trust]")
689        if "enrollment_credential_iid" in trust:
690            _set("enrollment_credential_iid", str(trust["enrollment_credential_iid"]), f"{toml_label} [trust]")
691        if "allow_all_consumers" in trust:
692            _set("allow_all_consumers", bool(trust["allow_all_consumers"]), f"{toml_label} [trust]")
693        if "allow_all_producers" in trust:
694            _set("allow_all_producers", bool(trust["allow_all_producers"]), f"{toml_label} [trust]")
695
696        # ── Connect (TOML [connect] section) ─────────────────────────────
697        connect = (toml_data or {}).get("connect", {})
698        if "endpoint_addr" in connect:
699            _set("endpoint_addr", str(connect["endpoint_addr"]), f"{toml_label} [connect]")
700
701        # ── Storage (TOML [storage] section) ─────────────────────────────
702        storage = (toml_data or {}).get("storage", {})
703        if "path" in storage:
704            _set("storage_path", str(storage["path"]), f"{toml_label} [storage]")
705
706        # ── Network (TOML [network] section) ─────────────────────────────
707        network = (toml_data or {}).get("network", {})
708        _NET_FIELDS = ("relay_mode", "bind_addr", "portmapper_config", "proxy_url")
709        for f in _NET_FIELDS:
710            if f in network:
711                _set(f, str(network[f]) if network[f] is not None else None, f"{toml_label} [network]")
712        _NET_BOOLS = (
713            "enable_monitoring", "enable_hooks",
714            "clear_ip_transports", "clear_relay_transports", "proxy_from_env",
715            "local_discovery",
716        )
717        for f in _NET_BOOLS:
718            if f in network:
719                _set(f, bool(network[f]), f"{toml_label} [network]")
720        if "hook_timeout_ms" in network:
721            _set("hook_timeout_ms", int(network["hook_timeout_ms"]), f"{toml_label} [network]")
722        if "secret_key" in network and network["secret_key"] is not None:
723            _set("secret_key", base64.b64decode(network["secret_key"]), f"{toml_label} [network]")
724
725        # ── Logging (TOML [logging] section) ─────────────────────────────
726        log_sec = (toml_data or {}).get("logging", {})
727        if "format" in log_sec:
728            _set("log_format", str(log_sec["format"]).lower(), f"{toml_label} [logging]")
729        if "level" in log_sec:
730            _set("log_level", str(log_sec["level"]).lower(), f"{toml_label} [logging]")
731        if "mask" in log_sec:
732            _set("log_mask", bool(log_sec["mask"]), f"{toml_label} [logging]")
733
734        # ── Env overrides (always win) ───────────────────────────────────
735        if (v := env.get("ASTER_ROOT_PUBKEY")) is not None:
736            _set("root_pubkey", bytes.fromhex(v.strip()), "ASTER_ROOT_PUBKEY")
737        if (v := env.get("ASTER_ROOT_PUBKEY_FILE")) is not None:
738            _set("root_pubkey_file", v.strip(), "ASTER_ROOT_PUBKEY_FILE")
739        if (v := env.get("ASTER_ENROLLMENT_CREDENTIAL")) is not None:
740            _set("enrollment_credential_file", v.strip(), "ASTER_ENROLLMENT_CREDENTIAL")
741        if (v := env.get("ASTER_ENROLLMENT_CREDENTIAL_IID")) is not None:
742            _set("enrollment_credential_iid", v.strip(), "ASTER_ENROLLMENT_CREDENTIAL_IID")
743        if (v := env.get("ASTER_ENDPOINT_ADDR")) is not None:
744            _set("endpoint_addr", v.strip(), "ASTER_ENDPOINT_ADDR")
745        if (v := env.get("ASTER_IDENTITY_FILE")) is not None:
746            _set("identity_file", v.strip(), "ASTER_IDENTITY_FILE")
747        if (v := env.get("ASTER_ALLOW_ALL_CONSUMERS")) is not None:
748            _set("allow_all_consumers", _parse_bool(v, "ASTER_ALLOW_ALL_CONSUMERS"), "ASTER_ALLOW_ALL_CONSUMERS")
749        if (v := env.get("ASTER_ALLOW_ALL_PRODUCERS")) is not None:
750            _set("allow_all_producers", _parse_bool(v, "ASTER_ALLOW_ALL_PRODUCERS"), "ASTER_ALLOW_ALL_PRODUCERS")
751        if (v := env.get("ASTER_STORAGE_PATH")) is not None:
752            _set("storage_path", v.strip() or None, "ASTER_STORAGE_PATH")
753        if (v := env.get("ASTER_SECRET_KEY")) is not None:
754            _set("secret_key", base64.b64decode(v.strip()) if v.strip() else None, "ASTER_SECRET_KEY")
755        if (v := env.get("ASTER_RELAY_MODE")) is not None:
756            _set("relay_mode", v.strip() or None, "ASTER_RELAY_MODE")
757        if (v := env.get("ASTER_BIND_ADDR")) is not None:
758            _set("bind_addr", v.strip() or None, "ASTER_BIND_ADDR")
759        for f in _NET_BOOLS:
760            var = f"ASTER_{f.upper()}"
761            if (v := env.get(var)) is not None:
762                _set(f, _parse_bool(v, var), var)
763        if (v := env.get("ASTER_HOOK_TIMEOUT_MS")) is not None:
764            _set("hook_timeout_ms", int(v), "ASTER_HOOK_TIMEOUT_MS")
765        for f in ("portmapper_config", "proxy_url"):
766            var = f"ASTER_{f.upper()}"
767            if (v := env.get(var)) is not None:
768                _set(f, v.strip() or None, var)
769
770        # Logging env overrides
771        if (v := env.get("ASTER_LOG_FORMAT")) is not None:
772            _set("log_format", v.strip().lower(), "ASTER_LOG_FORMAT")
773        if (v := env.get("ASTER_LOG_LEVEL")) is not None:
774            _set("log_level", v.strip().lower(), "ASTER_LOG_LEVEL")
775        if (v := env.get("ASTER_LOG_MASK")) is not None:
776            _set("log_mask", _parse_bool(v, "ASTER_LOG_MASK"), "ASTER_LOG_MASK")
777
778        obj = cls(**{k: v for k, v in kwargs.items() if k != "_sources"})
779        obj._sources = sources
780        return obj

Unified configuration for AsterServer.

Combines trust (root public key, admission policy), storage (memory vs persistent), and networking (relay, bind address, etc.) into one object.

Trust model (Aster-trust-spec.md §1.1): The root private key is offline -- it never touches a running node. Nodes receive only the root public key (to verify credentials) and optionally an enrollment credential (a pre-signed token for mesh join). The founding node of a mesh needs no enrollment credential; it bootstraps the accepted-producer set with just its own EndpointId.

Three ways to get an AsterConfig:

  1. Auto from env (default when AsterServer gets no config)::

    config = AsterConfig.from_env()

  2. From a TOML file (with env overrides)::

    config = AsterConfig.from_file("aster.toml")

  3. Inline (testing, scripts)::

    config = AsterConfig(root_pubkey=pub)

root_pubkey: bytes | None = None

32-byte ed25519 root public key (the deployment trust anchor). Highest priority -- overrides root_pubkey_file when set.

root_pubkey_file: str | None = None

Path to a file containing the root public key. Accepts either a plain hex string or a JSON object with a "public_key" field.

enrollment_credential_file: str | None = None

Path to a JSON enrollment credential (pre-signed by the offline root key). Required when a node joins an existing producer mesh. Not needed for the founding node or for dev/ephemeral mode.

allow_all_consumers: bool = False

Skip consumer admission gate. Default: gate consumers.

enrollment_credential_iid: str | None = None

Cloud Instance Identity Document token. Required when the enrollment credential's policy includes aster.iid_* attributes (AWS/GCP/Azure). Set via ASTER_ENROLLMENT_CREDENTIAL_IID.

allow_all_producers: bool = True

Skip producer admission gate. Default: allow all producers.

endpoint_addr: str | None = None

Producer's endpoint address (base64 NodeAddr or EndpointId hex string). The consumer dials this to reach RPC + admission + blobs/docs/gossip. Set via ASTER_ENDPOINT_ADDR.

storage_path: str | None = None

If set, use FsStore at this path; otherwise in-memory.

secret_key: bytes | None = None

32-byte node identity key. Determines the stable EndpointId. If unset, a fresh identity is generated each run (fine for dev).

local_discovery: bool = False

Enable mDNS local network discovery. Nodes on the same LAN can find each other without relay servers. Default off. Env: ASTER_LOCAL_DISCOVERY. TOML: [network] local_discovery.

log_format: str = 'text'

Log output format: "json" for structured (k8s/ELK) or "text" for dev. Env: ASTER_LOG_FORMAT. TOML: [logging] format.

log_level: str = 'info'

Log level: "debug", "info", "warning", "error". Env: ASTER_LOG_LEVEL. TOML: [logging] level.

log_mask: bool = True

Mask sensitive fields in logs (keys, credentials, endpoint IDs). Env: ASTER_LOG_MASK. TOML: [logging] mask.

identity_file: str | None = None

Path to .aster-identity TOML file. When set, the node key and enrollment credentials are loaded from this file. Env: ASTER_IDENTITY_FILE. Default: looks for .aster-identity in the current working directory.

def resolve_root_pubkey(self) -> bytes | None:
402    def resolve_root_pubkey(self) -> bytes | None:
403        """Return the root public key, resolving from config sources.
404
405        Resolution order:
406
407        1. Inline ``root_pubkey`` (highest priority).
408        2. ``root_pubkey_file`` (hex string or JSON with ``public_key``).
409        3. Generate an ephemeral keypair (dev mode only -- logged).
410
411        The root *private* key never appears here. In dev mode, a transient
412        private key is stored on ``_ephemeral_privkey`` so the companion
413        ``simple_consumer`` example can auto-mint a credential; it is never
414        persisted or configurable.
415        """
416        if self.root_pubkey is not None:
417            return self.root_pubkey
418
419        if self.root_pubkey_file:
420            pub = _load_pubkey_from_file(self.root_pubkey_file)
421            if pub is not None:
422                self.root_pubkey = pub
423                return pub
424            _config_logger.warning(
425                "Root pubkey file %s not found or invalid", self.root_pubkey_file
426            )
427
428        # Dev mode: generate ephemeral keypair if admission is needed.
429        if not self.allow_all_consumers or not self.allow_all_producers:
430            from .trust.signing import generate_root_keypair
431
432            priv, pub = generate_root_keypair()
433            self._ephemeral_privkey = priv  # transient, never persisted
434            self.root_pubkey = pub
435            self._sources["root_pubkey"] = "ephemeral (dev mode)"
436            _config_logger.info(
437                "Generated ephemeral root key "
438                "(set ASTER_ROOT_PUBKEY_FILE for production)"
439            )
440            return pub
441
442        return None

Return the root public key, resolving from config sources.

Resolution order:

  1. Inline root_pubkey (highest priority).
  2. root_pubkey_file (hex string or JSON with public_key).
  3. Generate an ephemeral keypair (dev mode only -- logged).

The root private key never appears here. In dev mode, a transient private key is stored on _ephemeral_privkey so the companion simple_consumer example can auto-mint a credential; it is never persisted or configurable.

def load_identity( self, peer_name: str | None = None, role: str | None = None) -> tuple[bytes | None, dict | None]:
444    def load_identity(self, peer_name: str | None = None,
445                      role: str | None = None) -> tuple[bytes | None, dict | None]:
446        """Load a peer entry from the ``.aster-identity`` file.
447
448        Args:
449            peer_name: Select peer by ``name``.  If None, auto-selects by
450                ``role`` (first match).
451            role: Fallback selector when ``peer_name`` is None.
452                ``"producer"`` for AsterServer, ``"consumer"`` for AsterClient.
453
454        Returns:
455            ``(secret_key_bytes, peer_dict)`` or ``(None, None)`` when no
456            identity file is found.
457        """
458        import base64 as _b64
459
460        path = self._resolve_identity_path()
461        if path is None:
462            return None, None
463
464        if sys.version_info >= (3, 11):
465            import tomllib as _tl
466        else:
467            import tomli as _tl  # type: ignore[no-redef]
468
469        with open(path, "rb") as f:
470            data = _tl.load(f)
471
472        node = data.get("node", {})
473        secret_key_b64 = node.get("secret_key")
474        secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None
475
476        # Find the peer entry
477        peers = data.get("peers", [])
478        peer = None
479        if peer_name is not None:
480            for p in peers:
481                if p.get("name") == peer_name:
482                    peer = p
483                    break
484        elif role is not None:
485            for p in peers:
486                if p.get("role") == role:
487                    peer = p
488                    break
489        elif peers:
490            peer = peers[0]  # single peer → auto-select
491
492        return secret_key, peer

Load a peer entry from the .aster-identity file.

Args: peer_name: Select peer by name. If None, auto-selects by role (first match). role: Fallback selector when peer_name is None. "producer" for AsterServer, "consumer" for AsterClient.

Returns: (secret_key_bytes, peer_dict) or (None, None) when no identity file is found.

def load_identity_from_path( self, path: str, peer_name: str | None = None, role: str | None = None) -> tuple[bytes | None, dict | None]:
503    def load_identity_from_path(
504        self,
505        path: str,
506        peer_name: str | None = None,
507        role: str | None = None,
508    ) -> tuple[bytes | None, dict | None]:
509        """Load secret_key + peer entry from a specific TOML identity file.
510
511        Same return shape as :meth:`load_identity` but reads from an
512        explicit path instead of consulting ``self.identity_file``. Used
513        by ``AsterClient`` to fold the secret key out of an
514        ``enrollment_credential_file`` when no separate ``identity=`` was
515        provided -- both options should do the same thing for the same
516        TOML file produced by ``aster enroll node``.
517        """
518        import base64 as _b64
519
520        if sys.version_info >= (3, 11):
521            import tomllib as _tl
522        else:
523            import tomli as _tl  # type: ignore[no-redef]
524
525        with open(path, "rb") as f:
526            data = _tl.load(f)
527
528        node = data.get("node", {})
529        secret_key_b64 = node.get("secret_key")
530        secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None
531
532        peers = data.get("peers", [])
533        peer = None
534        if peer_name is not None:
535            for p in peers:
536                if p.get("name") == peer_name:
537                    peer = p
538                    break
539        elif role is not None:
540            for p in peers:
541                if p.get("role") == role:
542                    peer = p
543                    break
544        elif peers:
545            peer = peers[0]
546
547        return secret_key, peer

Load secret_key + peer entry from a specific TOML identity file.

Same return shape as load_identity() but reads from an explicit path instead of consulting self.identity_file. Used by AsterClient to fold the secret key out of an enrollment_credential_file when no separate identity= was provided -- both options should do the same thing for the same TOML file produced by aster enroll node.

def to_endpoint_config(self) -> EndpointConfig | None:
549    def to_endpoint_config(self) -> EndpointConfig | None:
550        """Build an :class:`EndpointConfig` from the network fields.
551
552        Returns ``None`` when all network fields are at their defaults
553        (so ``IrohNode.memory_with_alpns`` can use the fast
554        ``Endpoint::bind(presets::N0)`` path).
555        """
556        has_custom = any([
557            self.relay_mode, self.secret_key, self.bind_addr,
558            self.enable_monitoring, self.enable_hooks,
559            self.clear_ip_transports, self.clear_relay_transports,
560            self.portmapper_config, self.proxy_url, self.proxy_from_env,
561            self.local_discovery,
562            self.hook_timeout_ms != 5000,
563        ])
564        if not has_custom:
565            return None
566        return EndpointConfig(
567            alpns=[],  # Router sets ALPNs
568            relay_mode=self.relay_mode,
569            secret_key=self.secret_key,
570            enable_monitoring=self.enable_monitoring,
571            enable_hooks=self.enable_hooks,
572            hook_timeout_ms=self.hook_timeout_ms,
573            bind_addr=self.bind_addr,
574            clear_ip_transports=self.clear_ip_transports,
575            clear_relay_transports=self.clear_relay_transports,
576            portmapper_config=self.portmapper_config,
577            proxy_url=self.proxy_url,
578            proxy_from_env=self.proxy_from_env,
579            enable_local_discovery=self.local_discovery,
580        )

Build an EndpointConfig from the network fields.

Returns None when all network fields are at their defaults (so IrohNode.memory_with_alpns can use the fast Endpoint::bind(presets::N0) path).

def print_config(self, *, json: bool = False) -> str:
584    def print_config(self, *, json: bool = False) -> str:
585        """Render the resolved configuration with provenance and masking.
586
587        Sensitive fields (``secret_key``, ``enrollment_credential_file``)
588        are masked.  ``root_pubkey`` is public and shown in full.
589
590        Args:
591            json: If True, return a JSON string; otherwise a human-readable table.
592
593        Returns:
594            The rendered config string (also printed to stdout).
595        """
596        sections = {
597            "trust": [
598                ("root_pubkey", self._fmt_bytes(self.root_pubkey, mask=False)),
599                ("root_pubkey_file", self.root_pubkey_file or "<not set>"),
600                ("enrollment_credential_file", self._fmt_masked(self.enrollment_credential_file)),
601                ("enrollment_credential_iid", self._fmt_masked(self.enrollment_credential_iid)),
602                ("allow_all_consumers", self.allow_all_consumers),
603                ("allow_all_producers", self.allow_all_producers),
604            ],
605            "connect": [
606                ("endpoint_addr", self.endpoint_addr or "<not set>"),
607            ],
608            "network": [
609                ("secret_key", self._fmt_bytes(self.secret_key, mask=True)),
610                ("relay_mode", self.relay_mode or "<default>"),
611                ("bind_addr", self.bind_addr or "<any>"),
612                ("enable_monitoring", self.enable_monitoring),
613                ("enable_hooks", self.enable_hooks),
614            ],
615            "storage": [
616                ("path", self.storage_path or "<in-memory>"),
617            ],
618        }
619
620        if json:
621            out: dict = {}
622            for section, fields in sections.items():
623                out[section] = {}
624                for name, value in fields:
625                    source = self._sources.get(name, "default")
626                    out[section][name] = {"value": value, "source": source}
627            text = _json.dumps(out, indent=2, default=str)
628        else:
629            lines = []
630            for section, fields in sections.items():
631                lines.append(f"  [{section}]")
632                for name, value in fields:
633                    source = self._sources.get(name, "default")
634                    lines.append(f"    {name:<28s}: {value!s:<36s} ({source})")
635            text = "\n".join(lines)
636
637        print(text)
638        return text

Render the resolved configuration with provenance and masking.

Sensitive fields (secret_key, enrollment_credential_file) are masked. root_pubkey is public and shown in full.

Args: json: If True, return a JSON string; otherwise a human-readable table.

Returns: The rendered config string (also printed to stdout).

@classmethod
def from_env(cls) -> aster.config.AsterConfig:
657    @classmethod
658    def from_env(cls) -> "AsterConfig":
659        """Build config from ``ASTER_*`` environment variables only."""
660        return cls._load(toml_data=None, toml_path=None)

Build config from ASTER_* environment variables only.

@classmethod
def from_file(cls, path: str | pathlib._local.Path) -> aster.config.AsterConfig:
662    @classmethod
663    def from_file(cls, path: str | Path) -> "AsterConfig":
664        """Build config from a TOML file, with env-var overrides."""
665        with Path(path).open("rb") as fh:
666            raw = tomllib.load(fh)
667        return cls._load(toml_data=raw, toml_path=str(path))

Build config from a TOML file, with env-var overrides.

def service( name_or_cls=None, *, name: str | None = None, version: int = 1, serialization: list[aster.rpc_types.SerializationMode] | aster.rpc_types.SerializationMode | None = None, scoped: str = 'shared', interceptors: list[type] | None = None, max_concurrent_streams: int | None = None, requires: aster.contract.identity.CapabilityRequirement | None = None, public: bool = False, metadata: Any = None) -> Union[Callable[[type], type], type]:
545def service(
546    name_or_cls=None,
547    *,
548    name: str | None = None,
549    version: int = 1,
550    serialization: list[SerializationMode] | SerializationMode | None = None,
551    scoped: str = "shared",
552    interceptors: list[type] | None = None,
553    max_concurrent_streams: int | None = None,
554    requires: CapabilityRequirement | None = None,
555    public: bool = False,
556    metadata: Any = None,
557) -> Callable[[type], type] | type:
558    """Class decorator to mark a class as an Aster RPC service.
559
560    Supports three calling forms::
561
562        @service                                  # bare -- name = class name
563        @service("AgentControl")                  # explicit name
564        @service(name="AgentControl", version=2)  # keyword name
565        @service(version=2, scoped="session")     # keyword-only, name = class name
566
567    Args:
568        name_or_cls: Service name (str), or the class itself when used as
569            bare ``@service`` without parentheses. Defaults to the class name.
570        name: Explicit service name (keyword-only alias for name_or_cls).
571        version: The service version (default: 1).
572        serialization: Supported serialization modes. Defaults to [XLANG].
573        scoped: Service scope: "shared" or "session". Default "shared".
574            The legacy alias "stream" is still accepted on input.
575        interceptors: List of interceptor classes to apply to all methods.
576        max_concurrent_streams: Maximum concurrent QUIC streams per
577            connection (i.e. per client peer). When a client exceeds
578            this limit, additional streams are rejected at the QUIC
579            layer. This applies to the server as a whole, not
580            per-service. ``None`` means unlimited.
581
582    Returns:
583        A decorator function (or the decorated class if used bare).
584    """
585    # Merge positional name_or_cls with keyword name=
586    if name is not None and name_or_cls is None:
587        name_or_cls = name
588
589    # @service (bare, no parens) -- name_or_cls is the class itself
590    if isinstance(name_or_cls, type):
591        return _apply_service_decorator(
592            name_or_cls,
593            name=name_or_cls.__name__,
594            version=version,
595            serialization=serialization,
596            scoped=scoped,
597            interceptors=interceptors,
598            max_concurrent_streams=max_concurrent_streams,
599            requires=requires,
600            public=public,
601            metadata=metadata,
602        )
603
604    # @service() or @service("Foo") or @service(name="Foo", version=2)
605    name = name_or_cls  # str | None
606
607    def decorator(cls: type) -> type:
608        return _apply_service_decorator(
609            cls,
610            name=name if name is not None else cls.__name__,
611            version=version,
612            serialization=serialization,
613            scoped=scoped,
614            interceptors=interceptors,
615            max_concurrent_streams=max_concurrent_streams,
616            requires=requires,
617            public=public,
618            metadata=metadata,
619        )
620
621    return decorator

Class decorator to mark a class as an Aster RPC service.

Supports three calling forms::

@service                                  # bare -- name = class name
@service("AgentControl")                  # explicit name
@service(name="AgentControl", version=2)  # keyword name
@service(version=2, scoped="session")     # keyword-only, name = class name

Args: name_or_cls: Service name (str), or the class itself when used as bare @service without parentheses. Defaults to the class name. name: Explicit service name (keyword-only alias for name_or_cls). version: The service version (default: 1). serialization: Supported serialization modes. Defaults to [XLANG]. scoped: Service scope: "shared" or "session". Default "shared". The legacy alias "stream" is still accepted on input. interceptors: List of interceptor classes to apply to all methods. max_concurrent_streams: Maximum concurrent QUIC streams per connection (i.e. per client peer). When a client exceeds this limit, additional streams are rejected at the QUIC layer. This applies to the server as a whole, not per-service. None means unlimited.

Returns: A decorator function (or the decorated class if used bare).

def server_stream( method_or_timeout: Union[Callable[~P, AsyncIterator[Any]], float, NoneType] = None, name: str | None = None, timeout: float | None = None, serialization: aster.rpc_types.SerializationMode | None = None, requires: aster.contract.identity.CapabilityRequirement | None = None, metadata: Any = None) -> Callable[~P, AsyncIterator[Any]]:
280def server_stream(
281    method_or_timeout: Callable[P, AsyncIterator[Any]] | float | None = None,
282    name: str | None = None,
283    timeout: float | None = None,
284    serialization: SerializationMode | None = None,
285    requires: CapabilityRequirement | None = None,
286    metadata: Any = None,
287) -> Callable[P, AsyncIterator[Any]]:
288    """Decorator to mark a method as a server-streaming RPC.
289
290    Can be used as:
291        @server_stream  # without parens
292        @server_stream()  # with parens
293        @server_stream(timeout=30.0)  # with options
294
295    Args:
296        method_or_timeout: Either the method (when used without parens) or a timeout value.
297        timeout: Optional timeout in seconds.
298        serialization: Override the serialization mode for this method.
299
300    Returns:
301        A decorator function or the decorated method.
302
303    Example::
304
305        @service(name="MyService", version=1)
306        class MyService:
307            @server_stream
308            async def watch_items(self, req: WatchRequest) -> AsyncIterator[ItemUpdate]:
309                for item in items:
310                    yield ItemUpdate(item=item)
311    """
312    # Handle @server_stream (no parens) - method is passed directly
313    if callable(method_or_timeout):
314        method = method_or_timeout
315        _apply_server_stream_decorator(method, name=name, timeout=timeout, serialization=serialization, requires=requires, metadata=metadata)
316        return method
317
318    # Handle @server_stream() or @server_stream(timeout=...)
319    actual_timeout = method_or_timeout if method_or_timeout is not None else timeout
320
321    def decorator(method: Callable[P, AsyncIterator[Any]]) -> Callable[P, AsyncIterator[Any]]:
322        _apply_server_stream_decorator(method, name=name, timeout=actual_timeout, serialization=serialization, requires=requires, metadata=metadata)
323        return method
324
325    return decorator

Decorator to mark a method as a server-streaming RPC.

Can be used as: @server_stream # without parens @server_stream() # with parens @server_stream(timeout=30.0) # with options

Args: method_or_timeout: Either the method (when used without parens) or a timeout value. timeout: Optional timeout in seconds. serialization: Override the serialization mode for this method.

Returns: A decorator function or the decorated method.

Example::

@service(name="MyService", version=1)
class MyService:
    @server_stream
    async def watch_items(self, req: WatchRequest) -> AsyncIterator[ItemUpdate]:
        for item in items:
            yield ItemUpdate(item=item)
def client_stream( method_or_timeout: Union[Callable[~P, Any], float, NoneType] = None, name: str | None = None, idempotent: bool = False, serialization: aster.rpc_types.SerializationMode | None = None, requires: aster.contract.identity.CapabilityRequirement | None = None, metadata: Any = None) -> Union[Callable[~P, Any], Callable]:
367def client_stream(
368    method_or_timeout: Callable[P, Any] | float | None = None,
369    name: str | None = None,
370    idempotent: bool = False,
371    serialization: SerializationMode | None = None,
372    requires: CapabilityRequirement | None = None,
373    metadata: Any = None,
374) -> Callable[P, Any] | Callable:
375    """Decorator to mark a method as a client-streaming RPC.
376
377    Can be used as:
378        @client_stream  # without parens
379        @client_stream()  # with parens
380        @client_stream(timeout=30.0)  # with options
381
382    Args:
383        method_or_timeout: Either the method (when used without parens) or a timeout value.
384        idempotent: Whether the method is safe to retry.
385        serialization: Override the serialization mode for this method.
386
387    Returns:
388        A decorator function or the decorated method.
389
390    Example::
391
392        @service(name="MyService", version=1)
393        class MyService:
394            @client_stream
395            async def aggregate(self, reqs: AsyncIterator[NumberRequest]) -> SumResponse:
396                total = 0
397                async for req in reqs:
398                    total += req.value
399                return SumResponse(total=total)
400    """
401    # Handle @client_stream (no parens) - method is passed directly
402    if callable(method_or_timeout):
403        method = method_or_timeout
404        _apply_client_stream_decorator(method, name=name, idempotent=idempotent, serialization=serialization, requires=requires, metadata=metadata)
405        return method
406
407    # Handle @client_stream() or @client_stream(timeout=...)
408    timeout = method_or_timeout
409
410    def decorator(method: Callable[P, Any]) -> Callable[P, Any]:
411        _apply_client_stream_decorator(method, name=name, timeout=timeout, idempotent=idempotent, serialization=serialization, requires=requires, metadata=metadata)
412        return method
413
414    return decorator

Decorator to mark a method as a client-streaming RPC.

Can be used as: @client_stream # without parens @client_stream() # with parens @client_stream(timeout=30.0) # with options

Args: method_or_timeout: Either the method (when used without parens) or a timeout value. idempotent: Whether the method is safe to retry. serialization: Override the serialization mode for this method.

Returns: A decorator function or the decorated method.

Example::

@service(name="MyService", version=1)
class MyService:
    @client_stream
    async def aggregate(self, reqs: AsyncIterator[NumberRequest]) -> SumResponse:
        total = 0
        async for req in reqs:
            total += req.value
        return SumResponse(total=total)
def bidi_stream( method_or_timeout: Union[Callable[~P, AsyncIterator[Any]], float, NoneType] = None, name: str | None = None, timeout: float | None = None, serialization: aster.rpc_types.SerializationMode | None = None, requires: aster.contract.identity.CapabilityRequirement | None = None, metadata: Any = None) -> Callable[~P, AsyncIterator[Any]]:
456def bidi_stream(
457    method_or_timeout: Callable[P, AsyncIterator[Any]] | float | None = None,
458    name: str | None = None,
459    timeout: float | None = None,
460    serialization: SerializationMode | None = None,
461    requires: CapabilityRequirement | None = None,
462    metadata: Any = None,
463) -> Callable[P, AsyncIterator[Any]]:
464    """Decorator to mark a method as a bidirectional-streaming RPC.
465
466    Can be used as:
467        @bidi_stream  # without parens
468        @bidi_stream()  # with parens
469        @bidi_stream(timeout=30.0)  # with options
470
471    Args:
472        method_or_timeout: Either the method (when used without parens) or a timeout value.
473        timeout: Optional timeout in seconds.
474        serialization: Override the serialization mode for this method.
475
476    Returns:
477        A decorator function or the decorated method.
478
479    Example::
480
481        @service(name="MyService", version=1)
482        class MyService:
483            @bidi_stream
484            async def chat(
485                self, requests: AsyncIterator[ChatMessage]
486            ) -> AsyncIterator[ChatMessage]:
487                async for req in requests:
488                    yield ChatMessage(text=f"echo: {req.text}")
489    """
490    # Handle @bidi_stream (no parens) - method is passed directly
491    if callable(method_or_timeout):
492        method = method_or_timeout
493        _apply_bidi_stream_decorator(method, name=name, timeout=timeout, serialization=serialization, requires=requires, metadata=metadata)
494        return method
495
496    # Handle @bidi_stream() or @bidi_stream(timeout=...)
497    actual_timeout = method_or_timeout if method_or_timeout is not None else timeout
498
499    def decorator(method: Callable[P, AsyncIterator[Any]]) -> Callable[P, AsyncIterator[Any]]:
500        _apply_bidi_stream_decorator(method, name=name, timeout=actual_timeout, serialization=serialization, requires=requires, metadata=metadata)
501        return method
502
503    return decorator

Decorator to mark a method as a bidirectional-streaming RPC.

Can be used as: @bidi_stream # without parens @bidi_stream() # with parens @bidi_stream(timeout=30.0) # with options

Args: method_or_timeout: Either the method (when used without parens) or a timeout value. timeout: Optional timeout in seconds. serialization: Override the serialization mode for this method.

Returns: A decorator function or the decorated method.

Example::

@service(name="MyService", version=1)
class MyService:
    @bidi_stream
    async def chat(
        self, requests: AsyncIterator[ChatMessage]
    ) -> AsyncIterator[ChatMessage]:
        async for req in requests:
            yield ChatMessage(text=f"echo: {req.text}")
def wire_type(tag: str, *, metadata: dict | None = None):
 66def wire_type(tag: str, *, metadata: dict | None = None):
 67    """Declare a stable wire identity for a dataclass.
 68
 69    The *tag* string is split on the last ``/`` into
 70    ``(namespace, typename)``.  If there is no ``/``
 71    the namespace is the empty string.
 72
 73    Use ``@wire_type`` when you need explicit control over the wire
 74    identity of a type (production services, cross-language compat).
 75    If omitted, the ``@service`` decorator auto-derives a tag from the
 76    module + class name at decoration time.
 77
 78    Args:
 79        tag: Wire type tag (e.g., ``"billing/Invoice"``).
 80        metadata: Optional dict mapping field names to ``Metadata`` objects
 81            for describing individual fields to AI agents.
 82
 83    Example::
 84
 85        @wire_type("billing/Invoice")
 86        @dataclass
 87        class Invoice:
 88            customer: str = ""
 89            amount: float = 0.0
 90            paid: bool = False
 91
 92    Serialization rules:
 93
 94    - **Supported field types:** ``str``, ``int``, ``float``, ``bool``,
 95      ``bytes``, ``list[T]``, ``dict[str, V]``, ``Optional[T]``,
 96      and other ``@wire_type`` dataclasses.
 97
 98    - **Use ``Optional[T]`` for nullable fields, NOT ``T | None``.**
 99      The Fory serializer does not support PEP 604 union syntax.
100      This will fail at runtime::
101
102          bio: str | None = None          # WRONG -- Fory can't serialize this
103
104      Use this instead::
105
106          bio: Optional[str] = None       # CORRECT
107
108    - **Every field must have a default value.** Fory XLANG needs defaults
109      for cross-language compatibility. Use ``""`` for strings, ``0`` for
110      ints, ``False`` for bools, and ``field(default_factory=list)`` for
111      collections.
112
113    - **``@rpc`` methods must return a ``@wire_type`` dataclass, not None.**
114      Even if your response has no fields, define an empty response type::
115
116          @wire_type("myapp/AckResult")
117          @dataclass
118          class AckResult:
119              ok: bool = True
120
121    - **Generic wrapper types** (e.g., ``SignedRequest[T]``) are supported.
122      The ``@wire_type`` tag is on the outer class; the type parameter is
123      resolved at manifest extraction time.
124    """
125
126    def decorator(cls):
127        parts = tag.rsplit("/", 1)
128        if len(parts) == 2:
129            cls.__fory_namespace__ = parts[0]
130            cls.__fory_typename__ = parts[1]
131        else:
132            cls.__fory_namespace__ = ""
133            cls.__fory_typename__ = tag
134        cls.__wire_type__ = tag
135        if metadata:
136            cls.__wire_type_field_metadata__ = metadata
137        return cls
138
139    return decorator

Declare a stable wire identity for a dataclass.

The tag string is split on the last / into (namespace, typename). If there is no / the namespace is the empty string.

Use @wire_type when you need explicit control over the wire identity of a type (production services, cross-language compat). If omitted, the @service decorator auto-derives a tag from the module + class name at decoration time.

Args: tag: Wire type tag (e.g., "billing/Invoice"). metadata: Optional dict mapping field names to Metadata objects for describing individual fields to AI agents.

Example::

@wire_type("billing/Invoice")
@dataclass
class Invoice:
    customer: str = ""
    amount: float = 0.0
    paid: bool = False

Serialization rules:

  • Supported field types: str, int, float, bool, bytes, list[T], dict[str, V], Optional[T], and other @wire_type dataclasses.

  • Use Optional[T] for nullable fields, NOT T | None. The Fory serializer does not support PEP 604 union syntax. This will fail at runtime::

    bio: str | None = None          # WRONG -- Fory can't serialize this
    

    Use this instead::

    bio: Optional[str] = None       # CORRECT
    
  • Every field must have a default value. Fory XLANG needs defaults for cross-language compatibility. Use "" for strings, 0 for ints, False for bools, and field(default_factory=list) for collections.

  • @rpc methods must return a @wire_type dataclass, not None. Even if your response has no fields, define an empty response type::

    @wire_type("myapp/AckResult")
    @dataclass
    class AckResult:
        ok: bool = True
    
  • Generic wrapper types (e.g., SignedRequest[T]) are supported. The @wire_type tag is on the outer class; the type parameter is resolved at manifest extraction time.

def any_of(*roles: str) -> aster.contract.identity.CapabilityRequirement:
42def any_of(*roles: str) -> CapabilityRequirement:
43    """Require ANY ONE of the listed roles (OR logic).
44
45    The caller is admitted if they have at least one of the specified roles.
46    Accepts plain strings or str enum values.
47
48    Example::
49
50        @rpc(requires=any_of("ops.logs", "ops.admin"))
51        async def tail_logs(self, req): ...
52
53        @rpc(requires=any_of(Role.LOGS, Role.ADMIN))
54        async def tail_logs(self, req): ...
55    """
56    return CapabilityRequirement(
57        kind=CapabilityKind.ANY_OF,
58        roles=[_role_value(r) for r in roles],
59    )

Require ANY ONE of the listed roles (OR logic).

The caller is admitted if they have at least one of the specified roles. Accepts plain strings or str enum values.

Example::

@rpc(requires=any_of("ops.logs", "ops.admin"))
async def tail_logs(self, req): ...

@rpc(requires=any_of(Role.LOGS, Role.ADMIN))
async def tail_logs(self, req): ...
def all_of(*roles: str) -> aster.contract.identity.CapabilityRequirement:
62def all_of(*roles: str) -> CapabilityRequirement:
63    """Require ALL of the listed roles (AND logic).
64
65    The caller is admitted only if they have every specified role.
66    Accepts plain strings or str enum values.
67
68    Example::
69
70        @rpc(requires=all_of("ops.status", "ops.admin"))
71        async def admin_status(self, req): ...
72
73        @rpc(requires=all_of(Role.STATUS, Role.ADMIN))
74        async def admin_status(self, req): ...
75    """
76    return CapabilityRequirement(
77        kind=CapabilityKind.ALL_OF,
78        roles=[_role_value(r) for r in roles],
79    )

Require ALL of the listed roles (AND logic).

The caller is admitted only if they have every specified role. Accepts plain strings or str enum values.

Example::

@rpc(requires=all_of("ops.status", "ops.admin"))
async def admin_status(self, req): ...

@rpc(requires=all_of(Role.STATUS, Role.ADMIN))
async def admin_status(self, req): ...
class SerializationMode(enum.IntEnum):
14class SerializationMode(IntEnum):
15    """Serialization protocol negotiated between client and server."""
16
17    XLANG = 0
18    NATIVE = 1
19    ROW = 2
20    JSON = 3

Serialization protocol negotiated between client and server.

class RpcError(builtins.Exception):
 75class RpcError(Exception):
 76    """Exception raised when an RPC call fails.
 77
 78    Catch this in client code to handle server-side errors::
 79
 80        from aster import RpcError, StatusCode
 81
 82        try:
 83            resp = await svc.my_method(request)
 84        except RpcError as e:
 85            print(f"RPC failed: {e.code.name} -- {e.message}")
 86            if e.details:
 87                print(f"Details: {e.details}")
 88
 89    Attributes:
 90        code: The :class:`StatusCode` describing the failure category.
 91        message: A human-readable error description.
 92        details: Arbitrary string key/value pairs carrying extra context.
 93    """
 94
 95    def __init__(
 96        self,
 97        code: StatusCode,
 98        message: str = "",
 99        details: dict[str, str] | None = None,
100    ) -> None:
101        self.code = code
102        self.message = message
103        self.details: dict[str, str] = details or {}
104        super().__init__(f"[{code.name}] {message}")
105
106    def __repr__(self) -> str:
107        return (
108            f"RpcError(code={self.code!r}, message={self.message!r}, "
109            f"details={self.details!r})"
110        )
111
112    @classmethod
113    def from_status(
114        cls,
115        code: StatusCode,
116        message: str = "",
117        details: dict[str, str] | None = None,
118    ) -> "RpcError":
119        """Create the most specific RpcError subclass for a status code."""
120        exc_type = _RPC_ERROR_TYPES.get(code, RpcError)
121        return exc_type(message=message, details=details)

Exception raised when an RPC call fails.

Catch this in client code to handle server-side errors::

from aster import RpcError, StatusCode

try:
    resp = await svc.my_method(request)
except RpcError as e:
    print(f"RPC failed: {e.code.name} -- {e.message}")
    if e.details:
        print(f"Details: {e.details}")

Attributes: code: The StatusCode describing the failure category. message: A human-readable error description. details: Arbitrary string key/value pairs carrying extra context.

@classmethod
def from_status( cls, code: aster.status.StatusCode, message: str = '', details: dict[str, str] | None = None) -> aster.status.RpcError:
112    @classmethod
113    def from_status(
114        cls,
115        code: StatusCode,
116        message: str = "",
117        details: dict[str, str] | None = None,
118    ) -> "RpcError":
119        """Create the most specific RpcError subclass for a status code."""
120        exc_type = _RPC_ERROR_TYPES.get(code, RpcError)
121        return exc_type(message=message, details=details)

Create the most specific RpcError subclass for a status code.

class StatusCode(enum.IntEnum):
13class StatusCode(IntEnum):
14    """RPC status codes.
15
16    Codes 0-16 mirror gRPC's ``google.rpc.Code`` semantically. Codes
17    100+ are Aster-native and have no gRPC equivalent. The 17-99
18    range is reserved as a buffer in case gRPC ever extends its enum.
19
20    A common gripe with gRPC's status codes is that there are too
21    few of them to express the variety of failures real services
22    actually hit -- everything ends up shoehorned into a handful of
23    overloaded categories. The 100+ space gives Aster room to mint
24    more precise codes over time, signalling clearly that they are
25    intentionally separate from the gRPC vocabulary.
26
27    Use these to inspect errors returned by the server::
28
29        try:
30            resp = await svc.get_status(req)
31        except RpcError as e:
32            if e.code == StatusCode.NOT_FOUND:
33                print("Service not found")
34            elif e.code == StatusCode.CONTRACT_VIOLATION:
35                print("Wire shape doesn't match the contract")
36    """
37
38    # ── gRPC-mirrored codes (0-16) ───────────────────────────────────
39    OK = 0                    #: Success.
40    CANCELLED = 1             #: The call was cancelled by the client.
41    UNKNOWN = 2               #: Unknown error (server bug or unhandled exception).
42    INVALID_ARGUMENT = 3      #: Client sent an invalid request.
43    DEADLINE_EXCEEDED = 4     #: The call timed out.
44    NOT_FOUND = 5             #: Requested resource does not exist.
45    ALREADY_EXISTS = 6        #: Resource already exists (e.g., duplicate create).
46    PERMISSION_DENIED = 7     #: Caller lacks required capability.
47    RESOURCE_EXHAUSTED = 8    #: Rate limit or quota exceeded.
48    FAILED_PRECONDITION = 9   #: Precondition not met (e.g., wrong state).
49    ABORTED = 10              #: Operation aborted (e.g., concurrency conflict).
50    OUT_OF_RANGE = 11         #: Value outside valid range.
51    UNIMPLEMENTED = 12        #: Method not implemented by the server.
52    INTERNAL = 13             #: Internal server error.
53    UNAVAILABLE = 14          #: Server temporarily unavailable (retry later).
54    DATA_LOSS = 15            #: Unrecoverable data loss.
55    UNAUTHENTICATED = 16      #: No valid credentials provided.
56
57    # ── Reserved range (17-99) ───────────────────────────────────────
58    # Intentionally left empty -- reserved as a buffer in case gRPC
59    # ever extends its enum upward. New Aster-native codes start at
60    # 101 and grow from there.
61
62    # ── Aster-native codes (100+) ────────────────────────────────────
63    #: The wire payload doesn't match the published contract -- e.g.
64    #: the JSON dict has fields the server's @wire_type dataclass
65    #: doesn't declare, or vice versa. Distinct from INVALID_ARGUMENT
66    #: because the violation is about the SHAPE of the data, not its
67    #: value, and because shape violations can occur at any nesting
68    #: depth (a top-level INVALID_ARGUMENT label doesn't apply when
69    #: the bad field is two objects deep). The producer owns the
70    #: contract; consumers must use the field names defined by the
71    #: producer's manifest.
72    CONTRACT_VIOLATION = 101

RPC status codes.

Codes 0-16 mirror gRPC's google.rpc.Code semantically. Codes 100+ are Aster-native and have no gRPC equivalent. The 17-99 range is reserved as a buffer in case gRPC ever extends its enum.

A common gripe with gRPC's status codes is that there are too few of them to express the variety of failures real services actually hit -- everything ends up shoehorned into a handful of overloaded categories. The 100+ space gives Aster room to mint more precise codes over time, signalling clearly that they are intentionally separate from the gRPC vocabulary.

Use these to inspect errors returned by the server::

try:
    resp = await svc.get_status(req)
except RpcError as e:
    if e.code == StatusCode.NOT_FOUND:
        print("Service not found")
    elif e.code == StatusCode.CONTRACT_VIOLATION:
        print("Wire shape doesn't match the contract")
class AdmissionDeniedError(builtins.PermissionError):
 96class AdmissionDeniedError(PermissionError):
 97    """Raised when a consumer is refused by the server's admission check.
 98
 99    The server never reveals *why* admission failed (no oracle leak), so this
100    exception enumerates the common causes as a hint to the user rather than
101    a precise diagnosis.
102    """
103
104    def __init__(
105        self,
106        *,
107        had_credential: bool,
108        credential_file: str | None,
109        our_endpoint_id: str,
110        server_address: str,
111    ) -> None:
112        self.had_credential = had_credential
113        self.credential_file = credential_file
114        self.our_endpoint_id = our_endpoint_id
115        self.server_address = server_address
116        super().__init__(self.format_hint())
117
118    def format_hint(self) -> str:
119        """Return a multi-line actionable hint suitable for CLI output."""
120        short_id = (self.our_endpoint_id[:16] + "...") if self.our_endpoint_id else "<unknown>"
121        if not self.had_credential:
122            return (
123                "consumer admission denied -- this server requires a credential.\n"
124                "  - Get an enrollment credential file (.cred) from the server's operator.\n"
125                "  - Then retry with: --rcan <path/to/file.cred>\n"
126                "    (or set ASTER_ENROLLMENT_CREDENTIAL=<path> in the environment)"
127            )
128        cred_label = self.credential_file or "<credential>"
129        return (
130            f"consumer admission denied -- the server rejected your credential.\n"
131            f"  credential: {cred_label}\n"
132            f"  your node:  {short_id}\n"
133            "  Common causes:\n"
134            "    1. The credential expired (check the 'Expires' field on the file).\n"
135            "    2. The credential was issued to a DIFFERENT node. Credentials are\n"
136            "       bound to a single endpoint id: if you copied this file from\n"
137            "       another machine/process, the server sees a different node id\n"
138            "       and refuses admission. Ask the operator to re-issue it for\n"
139            f"       endpoint_id={short_id}.\n"
140            "    3. The server trusts a different root key than the one that signed\n"
141            "       this credential.\n"
142            "    4. The credential's role/capabilities don't match this server's\n"
143            "       policy (the server may reject unknown capabilities outright)."
144        )

Raised when a consumer is refused by the server's admission check.

The server never reveals why admission failed (no oracle leak), so this exception enumerates the common causes as a hint to the user rather than a precise diagnosis.

def format_hint(self) -> str:
118    def format_hint(self) -> str:
119        """Return a multi-line actionable hint suitable for CLI output."""
120        short_id = (self.our_endpoint_id[:16] + "...") if self.our_endpoint_id else "<unknown>"
121        if not self.had_credential:
122            return (
123                "consumer admission denied -- this server requires a credential.\n"
124                "  - Get an enrollment credential file (.cred) from the server's operator.\n"
125                "  - Then retry with: --rcan <path/to/file.cred>\n"
126                "    (or set ASTER_ENROLLMENT_CREDENTIAL=<path> in the environment)"
127            )
128        cred_label = self.credential_file or "<credential>"
129        return (
130            f"consumer admission denied -- the server rejected your credential.\n"
131            f"  credential: {cred_label}\n"
132            f"  your node:  {short_id}\n"
133            "  Common causes:\n"
134            "    1. The credential expired (check the 'Expires' field on the file).\n"
135            "    2. The credential was issued to a DIFFERENT node. Credentials are\n"
136            "       bound to a single endpoint id: if you copied this file from\n"
137            "       another machine/process, the server sees a different node id\n"
138            "       and refuses admission. Ask the operator to re-issue it for\n"
139            f"       endpoint_id={short_id}.\n"
140            "    3. The server trusts a different root key than the one that signed\n"
141            "       this credential.\n"
142            "    4. The credential's role/capabilities don't match this server's\n"
143            "       policy (the server may reject unknown capabilities outright)."
144        )

Return a multi-line actionable hint suitable for CLI output.

class ContractViolationError(aster.public.RpcError):
204class ContractViolationError(RpcError):
205    """Raised when the wire payload doesn't match the published contract.
206
207    Carries the offending field names in ``details`` under the key
208    ``unexpected_fields`` (comma-separated, sanitized via repr).
209    """
210
211    def __init__(self, message: str = "", details: dict[str, str] | None = None) -> None:
212        super().__init__(StatusCode.CONTRACT_VIOLATION, message, details)

Raised when the wire payload doesn't match the published contract.

Carries the offending field names in details under the key unexpected_fields (comma-separated, sanitized via repr).

@dataclass
class CallContext:
16@dataclass
17class CallContext:
18    """Context for a single RPC call, available to interceptors and handlers.
19
20    Passed to every interceptor in the chain. Read ``service`` and ``method``
21    to know which RPC is being called. Use ``metadata`` to pass headers
22    between client and server. Check ``remaining_seconds`` for deadline
23    awareness.
24
25    Attributes:
26        service: The service name (e.g., ``"MissionControl"``).
27        method: The method name (e.g., ``"getStatus"``).
28        call_id: Unique ID for this call (auto-generated UUID).
29        peer: Remote peer identifier (endpoint ID hex).
30        metadata: Key/value headers sent with the call.
31        attributes: Enrollment attributes from the consumer's credential.
32        deadline: Absolute deadline as epoch timestamp, or ``None``.
33        is_streaming: ``True`` for streaming RPC patterns.
34        pattern: RPC pattern (``"unary"``, ``"server_stream"``, etc.).
35        idempotent: ``True`` if the method is safe to retry.
36        attempt: Current retry attempt number (starts at 1).
37    """
38
39    service: str
40    method: str
41    call_id: str = field(default_factory=lambda: str(uuid.uuid4()))
42    session_id: str | None = None
43    peer: str | None = None
44    metadata: dict[str, str] = field(default_factory=dict)
45    attributes: dict[str, str] = field(default_factory=dict)
46    deadline: float | None = None
47    is_streaming: bool = False
48    pattern: str | None = None
49    idempotent: bool = False
50    attempt: int = 1
51
52    @property
53    def remaining_seconds(self) -> float | None:
54        if self.deadline is None:
55            return None
56        return max(0.0, self.deadline - time.time())
57
58    @property
59    def expired(self) -> bool:
60        remaining = self.remaining_seconds
61        return remaining is not None and remaining <= 0.0

Context for a single RPC call, available to interceptors and handlers.

Passed to every interceptor in the chain. Read service and method to know which RPC is being called. Use metadata to pass headers between client and server. Check remaining_seconds for deadline awareness.

Attributes: service: The service name (e.g., "MissionControl"). method: The method name (e.g., "getStatus"). call_id: Unique ID for this call (auto-generated UUID). peer: Remote peer identifier (endpoint ID hex). metadata: Key/value headers sent with the call. attributes: Enrollment attributes from the consumer's credential. deadline: Absolute deadline as epoch timestamp, or None. is_streaming: True for streaming RPC patterns. pattern: RPC pattern ("unary", "server_stream", etc.). idempotent: True if the method is safe to retry. attempt: Current retry attempt number (starts at 1).

class Interceptor(abc.ABC):
64class Interceptor(ABC):
65    """Base interceptor interface."""
66
67    async def on_request(self, ctx: CallContext, request: object) -> object:
68        return request
69
70    async def on_response(self, ctx: CallContext, response: object) -> object:
71        return response
72
73    async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None:
74        return error

Base interceptor interface.

class DeadlineInterceptor(aster.public.Interceptor):
18class DeadlineInterceptor(Interceptor):
19    """Validates and enforces call deadlines.
20
21    Args:
22        skew_tolerance_ms: Milliseconds of clock-skew tolerance added to the
23            deadline when checking on receipt.  A request whose deadline has
24            already passed by more than this tolerance is rejected immediately
25            with ``DEADLINE_EXCEEDED``.  Defaults to 5000 ms (5 seconds).
26    """
27
28    def __init__(self, skew_tolerance_ms: int = 5000) -> None:
29        self._skew_tolerance_ms = skew_tolerance_ms
30
31    async def on_request(self, ctx: CallContext, request: object) -> object:
32        if ctx.deadline is not None:
33            now_epoch_ms = int(time.time() * 1000)
34            deadline_epoch_ms = int(ctx.deadline * 1000)
35            # Reject on receipt if expired beyond skew tolerance (S6.8.1)
36            if now_epoch_ms > deadline_epoch_ms + self._skew_tolerance_ms:
37                raise RpcError(
38                    StatusCode.DEADLINE_EXCEEDED,
39                    "deadline already expired on receipt "
40                    f"(now={now_epoch_ms}, deadline={deadline_epoch_ms}, "
41                    f"skew_tolerance={self._skew_tolerance_ms}ms)",
42                )
43            # Standard expiry check (no tolerance)
44            if ctx.expired:
45                raise RpcError(StatusCode.DEADLINE_EXCEEDED, "deadline exceeded")
46        return request
47
48    def timeout_seconds(self, ctx: CallContext) -> float | None:
49        """Return remaining seconds until deadline, or None if no deadline set."""
50        remaining = ctx.remaining_seconds
51        if remaining is None:
52            return None
53        return max(0.0, remaining)

Validates and enforces call deadlines.

Args: skew_tolerance_ms: Milliseconds of clock-skew tolerance added to the deadline when checking on receipt. A request whose deadline has already passed by more than this tolerance is rejected immediately with DEADLINE_EXCEEDED. Defaults to 5000 ms (5 seconds).

def timeout_seconds(self, ctx: aster.interceptors.base.CallContext) -> float | None:
48    def timeout_seconds(self, ctx: CallContext) -> float | None:
49        """Return remaining seconds until deadline, or None if no deadline set."""
50        remaining = ctx.remaining_seconds
51        if remaining is None:
52            return None
53        return max(0.0, remaining)

Return remaining seconds until deadline, or None if no deadline set.

class AuthInterceptor(aster.public.Interceptor):
12class AuthInterceptor(Interceptor):
13    """Injects and/or validates auth metadata."""
14
15    def __init__(
16        self,
17        *,
18        token_provider: str | Callable[[], str] | None = None,
19        validator: str | Callable[[str], bool] | None = None,
20        metadata_key: str = "authorization",
21        scheme: str | None = "Bearer",
22    ) -> None:
23        self._token_provider = token_provider
24        self._validator = validator
25        self._metadata_key = metadata_key
26        self._scheme = scheme
27
28    async def on_request(self, ctx: CallContext, request: object) -> object:
29        if self._token_provider is not None and self._metadata_key not in ctx.metadata:
30            token = self._token_provider() if callable(self._token_provider) else self._token_provider
31            ctx.metadata[self._metadata_key] = (
32                f"{self._scheme} {token}" if self._scheme else str(token)
33            )
34
35        if self._validator is not None:
36            raw_token = ctx.metadata.get(self._metadata_key, "")
37            token = raw_token
38            prefix = f"{self._scheme} " if self._scheme else ""
39            if prefix and raw_token.startswith(prefix):
40                token = raw_token[len(prefix):]
41            valid = self._validator(token) if callable(self._validator) else token == self._validator
42            if not valid:
43                raise RpcError(StatusCode.UNAUTHENTICATED, "authentication failed")
44
45        return request

Injects and/or validates auth metadata.

class RetryInterceptor(aster.public.Interceptor):
13class RetryInterceptor(Interceptor):
14    """Provides retry policy hints for client calls."""
15
16    def __init__(
17        self,
18        policy: RetryPolicy | None = None,
19        retryable_codes: set[StatusCode] | None = None,
20    ) -> None:
21        self.policy = policy or RetryPolicy()
22        self.retryable_codes = retryable_codes or {StatusCode.UNAVAILABLE}
23
24    def should_retry(self, ctx: CallContext, error: RpcError) -> bool:
25        return ctx.idempotent and error.code in self.retryable_codes
26
27    def backoff_seconds(self, attempt: int) -> float:
28        backoff = self.policy.backoff
29        delay_ms = min(
30            backoff.max_ms,
31            int(backoff.initial_ms * (backoff.multiplier ** max(0, attempt - 1))),
32        )
33        jitter = delay_ms * backoff.jitter * random.random()
34        return (delay_ms + jitter) / 1000.0

Provides retry policy hints for client calls.

class CircuitBreakerInterceptor(aster.public.Interceptor):
12class CircuitBreakerInterceptor(Interceptor):
13    """Simple CLOSED → OPEN → HALF_OPEN circuit breaker."""
14
15    CLOSED = "closed"
16    OPEN = "open"
17    HALF_OPEN = "half_open"
18
19    def __init__(
20        self,
21        *,
22        failure_threshold: int = 3,
23        recovery_timeout: float = 5.0,
24        half_open_max_calls: int = 1,
25    ) -> None:
26        self.failure_threshold = failure_threshold
27        self.recovery_timeout = recovery_timeout
28        self.half_open_max_calls = half_open_max_calls
29        self.state = self.CLOSED
30        self.failure_count = 0
31        self.opened_at = 0.0
32        self.half_open_calls = 0
33
34    def before_call(self, ctx: CallContext) -> None:
35        now = time.monotonic()
36        if self.state == self.OPEN:
37            if now - self.opened_at >= self.recovery_timeout:
38                self.state = self.HALF_OPEN
39                self.half_open_calls = 0
40            else:
41                raise RpcError(StatusCode.UNAVAILABLE, "circuit breaker is open")
42
43        if self.state == self.HALF_OPEN:
44            if self.half_open_calls >= self.half_open_max_calls:
45                raise RpcError(StatusCode.UNAVAILABLE, "circuit breaker is half-open")
46            self.half_open_calls += 1
47
48    def record_success(self) -> None:
49        self.failure_count = 0
50        self.half_open_calls = 0
51        self.state = self.CLOSED
52
53    def record_failure(self, error: RpcError) -> None:
54        if error.code not in {StatusCode.UNAVAILABLE, StatusCode.INTERNAL, StatusCode.UNKNOWN}:
55            return
56        if self.state == self.HALF_OPEN:
57            self.state = self.OPEN
58            self.opened_at = time.monotonic()
59            self.half_open_calls = 0
60            return
61        self.failure_count += 1
62        if self.failure_count >= self.failure_threshold:
63            self.state = self.OPEN
64            self.opened_at = time.monotonic()

Simple CLOSED → OPEN → HALF_OPEN circuit breaker.

class AuditLogInterceptor(aster.public.Interceptor):
14class AuditLogInterceptor(Interceptor):
15    """Captures structured audit events for requests, responses, and errors."""
16
17    def __init__(self, *, sink: list[dict[str, Any]] | None = None, logger: logging.Logger | None = None) -> None:
18        self.sink = sink if sink is not None else []
19        self.logger = logger or logging.getLogger(__name__)
20
21    def _record(self, event: str, ctx: CallContext, **extra: Any) -> None:
22        entry = {
23            "event": event,
24            "service": ctx.service,
25            "method": ctx.method,
26            "call_id": ctx.call_id,
27            "attempt": ctx.attempt,
28            "ts": time.time(),
29            **extra,
30        }
31        self.sink.append(entry)
32        self.logger.debug("audit=%s", entry)
33
34    async def on_request(self, ctx: CallContext, request: object) -> object:
35        self._record("request", ctx)
36        return request
37
38    async def on_response(self, ctx: CallContext, response: object) -> object:
39        self._record("response", ctx)
40        return response
41
42    async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None:
43        self._record("error", ctx, code=error.code.name, message=error.message)
44        return error

Captures structured audit events for requests, responses, and errors.

class MetricsInterceptor(aster.public.Interceptor):
 42class MetricsInterceptor(Interceptor):
 43    """Collects RED metrics and creates OTel spans for each RPC call.
 44
 45    Metrics collected:
 46      - ``aster.rpc.started`` -- counter, labels: service, method, pattern
 47      - ``aster.rpc.completed`` -- counter, labels: service, method, status
 48      - ``aster.rpc.duration`` -- histogram (seconds), labels: service, method
 49
 50    Tracing:
 51      - One span per RPC call: ``{service}/{method}``
 52      - Span attributes: rpc.service, rpc.method, rpc.system, rpc.status_code
 53
 54    Falls back to simple in-memory counters when OTel is not installed.
 55    """
 56
 57    def __init__(self) -> None:
 58        # In-memory fallback counters (always available)
 59        self.started = 0
 60        self.succeeded = 0
 61        self.failed = 0
 62
 63        # Try to set up OTel
 64        self._tracer: Any = None
 65        self._meter: Any = None
 66        self._started_counter: Any = None
 67        self._completed_counter: Any = None
 68        self._duration_histogram: Any = None
 69
 70        try:
 71            from opentelemetry import trace, metrics  # type: ignore
 72
 73            self._tracer = trace.get_tracer("aster.rpc", "0.2.0")
 74            self._meter = metrics.get_meter("aster.rpc", "0.2.0")
 75
 76            self._started_counter = self._meter.create_counter(
 77                "aster.rpc.started",
 78                description="Total RPC calls started",
 79                unit="1",
 80            )
 81            self._completed_counter = self._meter.create_counter(
 82                "aster.rpc.completed",
 83                description="Total RPC calls completed",
 84                unit="1",
 85            )
 86            self._duration_histogram = self._meter.create_histogram(
 87                "aster.rpc.duration",
 88                description="RPC call duration",
 89                unit="s",
 90            )
 91        except Exception:
 92            pass  # OTel not installed -- use fallback counters
 93
 94        # Track start times per call_id for duration calculation
 95        self._call_starts: dict[str, float] = {}
 96
 97    @property
 98    def has_otel(self) -> bool:
 99        """Whether OpenTelemetry is available and configured."""
100        return self._tracer is not None
101
102    async def on_request(self, ctx: CallContext, request: object) -> object:
103        self.started += 1
104
105        labels = {
106            "service": ctx.service,
107            "method": ctx.method,
108            "pattern": ctx.pattern or "unary",
109        }
110
111        # OTel counter
112        if self._started_counter:
113            self._started_counter.add(1, labels)
114
115        # Start timing
116        call_key = f"{ctx.service}.{ctx.method}.{id(request)}"
117        self._call_starts[call_key] = time.monotonic()
118
119        # Start OTel span
120        if self._tracer:
121            from opentelemetry import trace  # type: ignore
122
123            span = self._tracer.start_span(
124                f"{ctx.service}/{ctx.method}",
125                kind=trace.SpanKind.SERVER,
126                attributes={
127                    "rpc.system": "aster",
128                    "rpc.service": ctx.service,
129                    "rpc.method": ctx.method,
130                    "rpc.aster.pattern": ctx.pattern or "unary",
131                    "rpc.aster.idempotent": ctx.idempotent,
132                },
133            )
134            # Store span on context for on_response/on_error
135            ctx._otel_span = span  # type: ignore[attr-defined]
136            ctx._otel_call_key = call_key  # type: ignore[attr-defined]
137
138        # Set correlation ID for structured logging
139        from aster.logging import set_request_id
140        set_request_id(ctx.call_id or f"{ctx.service}.{ctx.method}")
141
142        return request
143
144    async def on_response(self, ctx: CallContext, response: object) -> object:
145        self.succeeded += 1
146
147        labels = {"service": ctx.service, "method": ctx.method, "status": "OK"}
148
149        if self._completed_counter:
150            self._completed_counter.add(1, labels)
151
152        # Record duration
153        call_key = getattr(ctx, "_otel_call_key", None)
154        if call_key and call_key in self._call_starts:
155            duration = time.monotonic() - self._call_starts.pop(call_key)
156            if self._duration_histogram:
157                self._duration_histogram.record(
158                    duration, {"service": ctx.service, "method": ctx.method}
159                )
160
161        # End OTel span
162        span = getattr(ctx, "_otel_span", None)
163        if span:
164            from opentelemetry.trace import StatusCode as OtelStatus  # type: ignore
165            span.set_status(OtelStatus.OK)
166            span.end()
167
168        return response
169
170    async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None:
171        self.failed += 1
172
173        labels = {
174            "service": ctx.service,
175            "method": ctx.method,
176            "status": error.code.name if hasattr(error.code, "name") else str(error.code),
177        }
178
179        if self._completed_counter:
180            self._completed_counter.add(1, labels)
181
182        # Record duration
183        call_key = getattr(ctx, "_otel_call_key", None)
184        if call_key and call_key in self._call_starts:
185            duration = time.monotonic() - self._call_starts.pop(call_key)
186            if self._duration_histogram:
187                self._duration_histogram.record(
188                    duration, {"service": ctx.service, "method": ctx.method}
189                )
190
191        # End OTel span with error
192        span = getattr(ctx, "_otel_span", None)
193        if span:
194            from opentelemetry.trace import StatusCode as OtelStatus  # type: ignore
195            span.set_status(OtelStatus.ERROR, str(error.message))
196            span.set_attribute("rpc.aster.error_code", str(error.code))
197            span.record_exception(error)
198            span.end()
199
200        return error
201
202    def snapshot(self) -> dict[str, int]:
203        """Return a snapshot of in-memory counters."""
204        return {
205            "started": self.started,
206            "succeeded": self.succeeded,
207            "failed": self.failed,
208            "in_flight": self.started - self.succeeded - self.failed,
209        }

Collects RED metrics and creates OTel spans for each RPC call.

Metrics collected:

  • aster.rpc.started -- counter, labels: service, method, pattern
  • aster.rpc.completed -- counter, labels: service, method, status
  • aster.rpc.duration -- histogram (seconds), labels: service, method

Tracing:

  • One span per RPC call: {service}/{method}
  • Span attributes: rpc.service, rpc.method, rpc.system, rpc.status_code

Falls back to simple in-memory counters when OTel is not installed.

has_otel: bool
 97    @property
 98    def has_otel(self) -> bool:
 99        """Whether OpenTelemetry is available and configured."""
100        return self._tracer is not None

Whether OpenTelemetry is available and configured.

def snapshot(self) -> dict[str, int]:
202    def snapshot(self) -> dict[str, int]:
203        """Return a snapshot of in-memory counters."""
204        return {
205            "started": self.started,
206            "succeeded": self.succeeded,
207            "failed": self.failed,
208            "in_flight": self.started - self.succeeded - self.failed,
209        }

Return a snapshot of in-memory counters.