aster.public
Aster RPC -- Public API Reference
This module re-exports the public API surface for documentation.
Import from aster directly in your code, not from aster.public.
Getting Started
Producer (server)::
from aster import AsterServer, service, rpc, wire_type
from dataclasses import dataclass
@wire_type("myapp/GreetRequest")
@dataclass
class GreetRequest:
name: str = ""
@wire_type("myapp/GreetResponse")
@dataclass
class GreetResponse:
message: str = ""
@service(name="Greeter", version=1)
class Greeter:
@rpc
async def greet(self, req: GreetRequest) -> GreetResponse:
return GreetResponse(message=f"Hello, {req.name}!")
async with AsterServer(services=[Greeter()]) as srv:
print(srv.address) # share this with consumers
await srv.serve()
Consumer (client)::
from aster import AsterClient
client = AsterClient(address="aster1...")
await client.connect()
# Use the shell for exploration:
# aster shell aster1...
# Or generate a typed client:
# aster contract gen-client aster1... --out ./clients --lang python --package myapp
Decorators
Use these to define RPC services:
service()-- Declare a class as an RPC servicerpc()-- Mark a method as a unary RPC endpointserver_stream()-- Mark a method as server-streamingclient_stream()-- Mark a method as client-streamingbidi_stream()-- Mark a method as bidirectional streamingwire_type()-- Register a dataclass for cross-language serialization
Authorization
Compose capability requirements:
any_of()-- caller must have at least one of the listed capabilitiesall_of()-- caller must have every listed capability
Serialization
SerializationMode-- pick between XLANG (cross-language Fory), NATIVE (single-language Fory), ROW (zero-copy Fory rows for data-heavy workloads), and JSON (human-readable wire format for debugging and the dynamic proxy client).
Error Handling
All RPC failures raise RpcError with a StatusCode.
Specific subclasses give actionable diagnostics:
AdmissionDeniedError-- raised when the server refuses consumer admission (open-gate vs trusted-mode mismatch, expired credential, endpoint id mismatch). The error message enumerates the common causes.ContractViolationError-- raised when the strict-mode codec rejects a payload whose shape doesn't match the contract.
Interceptors
Built-in middleware that wraps every RPC call. Configure them on the
AsterServer or AsterClient constructor.
CallContext-- per-call context object passed through the chainInterceptor-- base class for custom interceptorsDeadlineInterceptor-- enforce per-call deadlinesAuthInterceptor-- token-based authenticationRetryInterceptor-- automatic retry for idempotent methodsCircuitBreakerInterceptor-- circuit breaker for failing endpointsAuditLogInterceptor-- log every RPC call for auditMetricsInterceptor-- collect call latency and error metrics
1""" 2Aster RPC -- Public API Reference 3================================= 4 5This module re-exports the public API surface for documentation. 6Import from ``aster`` directly in your code, not from ``aster.public``. 7 8Getting Started 9--------------- 10 11**Producer** (server):: 12 13 from aster import AsterServer, service, rpc, wire_type 14 from dataclasses import dataclass 15 16 @wire_type("myapp/GreetRequest") 17 @dataclass 18 class GreetRequest: 19 name: str = "" 20 21 @wire_type("myapp/GreetResponse") 22 @dataclass 23 class GreetResponse: 24 message: str = "" 25 26 @service(name="Greeter", version=1) 27 class Greeter: 28 @rpc 29 async def greet(self, req: GreetRequest) -> GreetResponse: 30 return GreetResponse(message=f"Hello, {req.name}!") 31 32 async with AsterServer(services=[Greeter()]) as srv: 33 print(srv.address) # share this with consumers 34 await srv.serve() 35 36**Consumer** (client):: 37 38 from aster import AsterClient 39 40 client = AsterClient(address="aster1...") 41 await client.connect() 42 43 # Use the shell for exploration: 44 # aster shell aster1... 45 46 # Or generate a typed client: 47 # aster contract gen-client aster1... --out ./clients --lang python --package myapp 48 49Decorators 50---------- 51 52Use these to define RPC services: 53 54- :func:`service` -- Declare a class as an RPC service 55- :func:`rpc` -- Mark a method as a unary RPC endpoint 56- :func:`server_stream` -- Mark a method as server-streaming 57- :func:`client_stream` -- Mark a method as client-streaming 58- :func:`bidi_stream` -- Mark a method as bidirectional streaming 59- :func:`wire_type` -- Register a dataclass for cross-language serialization 60 61Authorization 62------------- 63 64Compose capability requirements: 65 66- :func:`any_of` -- caller must have at least one of the listed capabilities 67- :func:`all_of` -- caller must have every listed capability 68 69Serialization 70------------- 71 72- :class:`SerializationMode` -- pick between XLANG (cross-language Fory), 73 NATIVE (single-language Fory), ROW (zero-copy Fory rows for data-heavy 74 workloads), and JSON (human-readable wire format for debugging and the 75 dynamic proxy client). 76 77Error Handling 78-------------- 79 80All RPC failures raise :class:`RpcError` with a :class:`StatusCode`. 81Specific subclasses give actionable diagnostics: 82 83- :class:`AdmissionDeniedError` -- raised when the server refuses 84 consumer admission (open-gate vs trusted-mode mismatch, expired 85 credential, endpoint id mismatch). The error message enumerates the 86 common causes. 87- :class:`ContractViolationError` -- raised when the strict-mode codec 88 rejects a payload whose shape doesn't match the contract. 89 90Interceptors 91------------ 92 93Built-in middleware that wraps every RPC call. Configure them on the 94``AsterServer`` or ``AsterClient`` constructor. 95 96- :class:`CallContext` -- per-call context object passed through the chain 97- :class:`Interceptor` -- base class for custom interceptors 98- :class:`DeadlineInterceptor` -- enforce per-call deadlines 99- :class:`AuthInterceptor` -- token-based authentication 100- :class:`RetryInterceptor` -- automatic retry for idempotent methods 101- :class:`CircuitBreakerInterceptor` -- circuit breaker for failing endpoints 102- :class:`AuditLogInterceptor` -- log every RPC call for audit 103- :class:`MetricsInterceptor` -- collect call latency and error metrics 104""" 105 106from aster.runtime import AsterServer, AsterClient, AdmissionDeniedError 107from aster.decorators import service, rpc, server_stream, client_stream, bidi_stream 108from aster.codec import wire_type 109from aster.status import RpcError, StatusCode, ContractViolationError 110from aster.rpc_types import SerializationMode 111from aster.config import AsterConfig 112from aster.capabilities import any_of, all_of 113from aster.interceptors import ( 114 CallContext, 115 Interceptor, 116 DeadlineInterceptor, 117 AuthInterceptor, 118 RetryInterceptor, 119 CircuitBreakerInterceptor, 120 AuditLogInterceptor, 121 MetricsInterceptor, 122) 123 124 125__all__ = [ 126 # Server / client 127 "AsterServer", 128 "AsterClient", 129 "AsterConfig", 130 # Decorators 131 "service", 132 "rpc", 133 "server_stream", 134 "client_stream", 135 "bidi_stream", 136 "wire_type", 137 # Authorization 138 "any_of", 139 "all_of", 140 # Serialization 141 "SerializationMode", 142 # Errors 143 "RpcError", 144 "StatusCode", 145 "AdmissionDeniedError", 146 "ContractViolationError", 147 # Interceptors 148 "CallContext", 149 "Interceptor", 150 "DeadlineInterceptor", 151 "AuthInterceptor", 152 "RetryInterceptor", 153 "CircuitBreakerInterceptor", 154 "AuditLogInterceptor", 155 "MetricsInterceptor", 156]
150class AsterServer: 151 """High-level, declarative producer. 152 153 Builds a single :class:`IrohNode` that serves blobs + docs + gossip 154 (iroh built-in protocols) alongside aster RPC (``aster/1``) and any 155 enabled admission ALPNs -- all on **one endpoint, one node ID**. 156 157 When any admission gate is active (``allow_all_consumers=False`` or 158 ``allow_all_producers=False``), the node is built with 159 ``enable_hooks=True`` and a background task runs the Gate 0 160 connection-level hook loop (``MeshEndpointHook.run_hook_loop``), which 161 gates *all* protocols (blobs, docs, gossip, aster/1, admission) at the 162 QUIC handshake layer. 163 """ 164 165 def __init__( 166 self, 167 services: list, 168 *, 169 config: "AsterConfig | None" = None, 170 peer: str | None = None, 171 identity: str | None = None, 172 # Inline overrides (take priority over config): 173 root_pubkey: bytes | None = None, 174 allow_all_consumers: bool | None = None, 175 allow_all_producers: bool | None = None, 176 endpoint_config: EndpointConfig | None = None, 177 # Internal wiring: 178 channel_name: str = "rpc", 179 codec: Any | None = None, 180 interceptors: list[Any] | None = None, 181 hook: MeshEndpointHook | None = None, 182 nonce_store: Any | None = None, 183 registry_namespace: str = "", 184 mesh_state: MeshState | None = None, 185 clock_drift_config: ClockDriftConfig | None = None, 186 persist_mesh_state: bool = False, 187 ) -> None: 188 """Create an Aster RPC server. 189 190 .. note:: **Interceptors are not wired by default.** The server ships 191 with interceptors for rate limiting, deadline enforcement, auth, 192 capability checks, circuit breaking, metrics, audit logging, and 193 retry hints -- but none are active unless you pass them via the 194 ``interceptors`` parameter. For production use, wire at minimum 195 ``DeadlineInterceptor`` and ``RateLimitInterceptor``. All 196 interceptors live in ``aster.interceptors``. 197 198 Args: 199 services: List of ``@service``-decorated class instances to serve. 200 At least one is required. 201 config: Optional :class:`AsterConfig` for trust, storage, and 202 networking settings. If omitted, settings are loaded from 203 environment variables and defaults. 204 peer: Optional peer name for this server (used in config lookup 205 and identity file resolution). 206 identity: Path to ``.aster-identity`` file (default: auto-detected 207 from CWD). Overrides ``config.identity_file``. 208 root_pubkey: 32-byte ed25519 public key for the trust anchor. 209 Overrides ``config.root_pubkey`` if both are set. 210 allow_all_consumers: If ``True``, skip consumer admission 211 (open gate). Overrides ``config.allow_all_consumers``. 212 allow_all_producers: If ``True``, skip producer admission. 213 Overrides ``config.allow_all_producers``. 214 endpoint_config: Low-level iroh endpoint configuration. 215 216 Example:: 217 218 @service(name="MyService", version=1) 219 class MyService: 220 @rpc() 221 async def hello(self, req: HelloRequest) -> HelloResponse: 222 return HelloResponse(message=f"Hello {req.name}") 223 224 async with AsterServer(services=[MyService()]) as srv: 225 print(srv.address) 226 await srv.serve() 227 """ 228 if not services: 229 raise ValueError("AsterServer requires at least one service") 230 231 # Auto-load config from env if none provided. 232 from .config import AsterConfig 233 if config is None: 234 config = AsterConfig.from_env() 235 if identity is not None: 236 config.identity_file = identity 237 self._config = config 238 239 # Inline overrides win over config. 240 self._allow_all_consumers = ( 241 allow_all_consumers if allow_all_consumers is not None 242 else config.allow_all_consumers 243 ) 244 self._allow_all_producers = ( 245 allow_all_producers if allow_all_producers is not None 246 else config.allow_all_producers 247 ) 248 249 # Load identity file if present (.aster-identity). 250 self._peer_name = peer 251 secret_key_from_identity, peer_entry = config.load_identity( 252 peer_name=peer, role="producer" 253 ) 254 if peer_entry and not root_pubkey: 255 # Root pubkey comes from the credential in the identity file. 256 root_pubkey = bytes.fromhex(peer_entry["root_pubkey"]) 257 if secret_key_from_identity and not config.secret_key: 258 import base64 as _b64 259 config.secret_key = secret_key_from_identity 260 261 # Resolve root public key: inline > identity file > config file > ephemeral. 262 # The root private key is NEVER on a running node (trust spec §1.1). 263 pub = config.resolve_root_pubkey() 264 self._root_pubkey = root_pubkey if root_pubkey is not None else pub 265 266 # Dev mode: if using an ephemeral root key (no explicit pubkey file), 267 # auto-open the consumer gate so the quickstart works without 268 # credential files. In production (explicit root_pubkey_file), 269 # the default allow_all_consumers=False requires credentials. 270 if ( 271 config._ephemeral_privkey is not None 272 and allow_all_consumers is None 273 and config.root_pubkey_file is None 274 ): 275 self._allow_all_consumers = True 276 logger.info( 277 "Dev mode: allow_all_consumers=True (ephemeral root key). " 278 "Set ASTER_ROOT_PUBKEY_FILE for production admission." 279 ) 280 281 if (not self._allow_all_consumers or not self._allow_all_producers) and self._root_pubkey is None: 282 raise ValueError( 283 "root_pubkey is required when admission is enabled " 284 "(allow_all_consumers=False or allow_all_producers=False). " 285 "Set ASTER_ROOT_PUBKEY_FILE or pass root_pubkey= explicitly." 286 ) 287 288 self._services_in: list = list(services) 289 self._endpoint_config_template = endpoint_config or config.to_endpoint_config() 290 self._channel_name = channel_name 291 self._codec = codec 292 from aster.interceptors.deadline import DeadlineInterceptor 293 if interceptors is not None: 294 self._interceptors = list(interceptors) 295 else: 296 self._interceptors = [DeadlineInterceptor()] 297 self._hook = hook 298 self._nonce_store = nonce_store 299 300 # Admission → dispatch bridge: stores per-peer attributes 301 from aster.peer_store import PeerAttributeStore 302 self._peer_store = PeerAttributeStore() 303 self._registry_namespace = registry_namespace 304 self._mesh_state = mesh_state 305 self._clock_drift_config = clock_drift_config 306 self._persist_mesh_state = persist_mesh_state 307 308 # Populated by start() 309 self._started: bool = False 310 self._node: IrohNode | None = None 311 self._service_summaries: list[ServiceSummary] = [] 312 self._server: Server | None = None 313 # Lazy caches for .blobs / .docs / .gossip 314 self._blobs: Any | None = None 315 self._docs: Any | None = None 316 self._gossip: Any | None = None 317 318 # Populated by serve() 319 self._serve_task: asyncio.Task | None = None 320 self._subtasks: list[asyncio.Task] = [] 321 self._closed: bool = False 322 323 # Producer service tokens for @aster endpoint registration. 324 self._producer_tokens: dict[str, dict] = {} # service_name -> token dict 325 self._load_producer_tokens() 326 327 # Delegation policies for aster.admission ALPN. 328 # Built from published_services entries that have aster_root_pubkey. 329 self._delegation_policies: dict[str, Any] = {} # service_name -> policy 330 self._load_delegation_policies() 331 332 # ── Lifecycle ──────────────────────────────────────────────────────────── 333 334 async def start(self) -> None: 335 """Create the unified node and compute ``ServiceSummary`` list. Idempotent.""" 336 if self._started: 337 return 338 339 # Configure structured logging from config (idempotent) 340 from aster.logging import configure_logging 341 configure_logging( 342 format=self._config.log_format, 343 level=self._config.log_level, 344 mask=self._config.log_mask, 345 ) 346 347 # Determine which aster ALPNs to register on the Router. 348 # Consumer admission is ALWAYS registered -- even in open-gate mode 349 # the consumer uses it to discover services. 350 aster_alpns: list[bytes] = [RPC_ALPN, ALPN_CONSUMER_ADMISSION] 351 gate0_needed = False 352 if not self._allow_all_consumers: 353 if self._hook is None: 354 self._hook = MeshEndpointHook(peer_store=self._peer_store) 355 if self._nonce_store is None: 356 self._nonce_store = InMemoryNonceStore() 357 gate0_needed = True 358 if not self._allow_all_producers: 359 aster_alpns.append(ALPN_PRODUCER_ADMISSION) 360 if self._hook is None: 361 self._hook = MeshEndpointHook(peer_store=self._peer_store) 362 gate0_needed = True 363 364 # Build EndpointConfig so hooks (Gate 0) are installed when needed. 365 ep_cfg = _build_node_endpoint_config( 366 self._endpoint_config_template, enable_hooks=gate0_needed 367 ) 368 369 self._node = await IrohNode.memory_with_alpns(aster_alpns, ep_cfg) 370 addr_b64 = base64.b64encode( 371 self._node.node_addr_info().to_bytes() 372 ).decode() 373 374 # Auto-create ephemeral MeshState. Even when allow_all_producers=True 375 # we need the topic_id so the root node's shell can observe gossip. 376 if self._mesh_state is None and self._root_pubkey is not None: 377 self._mesh_state = make_ephemeral_mesh_state(self._root_pubkey) 378 379 # Build ServiceSummary list with per-spec contract_id. 380 summaries: list[ServiceSummary] = [] 381 for svc in self._services_in: 382 svc_cls = svc if inspect.isclass(svc) else type(svc) 383 info = getattr(svc_cls, "__aster_service_info__", None) 384 if info is None: 385 raise TypeError( 386 f"{svc_cls!r} is not @service-decorated " 387 f"(missing __aster_service_info__)" 388 ) 389 cid = contract_id_from_service(svc_cls) 390 summaries.append( 391 ServiceSummary( 392 name=info.name, 393 version=info.version, 394 contract_id=cid, 395 channels={self._channel_name: addr_b64}, 396 pattern=info.scoped or "shared", 397 ) 398 ) 399 self._service_summaries = summaries 400 401 # ── C11.4.3: Startup publication verification ──────────────────── 402 # If .aster/manifest.json exists, verify that each service's live 403 # contract_id matches the committed manifest. Fatal on mismatch. 404 manifest_path = os.path.join(os.getcwd(), ".aster", "manifest.json") 405 if os.path.isfile(manifest_path): 406 import json as _json 407 from .contract.manifest import ContractManifest 408 409 try: 410 with open(manifest_path, encoding="utf-8") as f: 411 manifest_data = _json.load(f) 412 except Exception as exc: 413 raise RuntimeError( 414 f"Failed to read manifest at {manifest_path}: {exc}" 415 ) from exc 416 417 # Support both single manifest and list of manifests 418 manifests_list = manifest_data if isinstance(manifest_data, list) else [manifest_data] 419 manifest_by_key: dict[tuple[str, int], ContractManifest] = {} 420 for md in manifests_list: 421 m = ContractManifest(**md) 422 manifest_by_key[(m.service, m.version)] = m 423 424 for summary in summaries: 425 key = (summary.name, summary.version) 426 manifest = manifest_by_key.get(key) 427 if manifest is None: 428 continue # Service not in manifest -- skip 429 if summary.contract_id != manifest.contract_id: 430 raise RuntimeError( 431 f"Contract identity mismatch for {summary.name!r} " 432 f"v{summary.version}:\n" 433 f" Expected (manifest): {manifest.contract_id}\n" 434 f" Actual (live): {summary.contract_id}\n" 435 f" Manifest: {manifest_path}\n" 436 f" -> The service interface has changed without " 437 f"updating the manifest.\n" 438 f" Rerun `aster contract gen` and commit the " 439 f"updated manifest." 440 ) 441 logger.debug("Manifest verification passed for %d service(s)", len(manifest_by_key)) 442 443 # ── Contract publication to registry doc ───────────────────────── 444 # Create a registry doc, publish contract collections (with manifest 445 # including method schemas), and generate a read-only share ticket 446 # so consumers can discover full contract metadata. 447 await self._publish_contracts() 448 449 # ── Gate 3: capability interceptor & default-deny warning ──────── 450 # 451 # Build a service_name -> ServiceInfo map for the CapabilityInterceptor 452 # and check whether any service has authorization configured. 453 from .interceptors.capability import CapabilityInterceptor 454 455 svc_map: dict[str, Any] = {} 456 any_has_requires = False 457 for svc in self._services_in: 458 svc_cls = svc if inspect.isclass(svc) else type(svc) 459 info = getattr(svc_cls, "__aster_service_info__", None) 460 if info is not None: 461 svc_map[info.name] = info 462 if info.requires is not None: 463 any_has_requires = True 464 for mi in info.methods.values(): 465 if mi.requires is not None: 466 any_has_requires = True 467 468 # Auto-wire the CapabilityInterceptor when trust is configured 469 # (Gate 0 enabled) or when any service declares requires. 470 has_auth_interceptor = any( 471 isinstance(i, CapabilityInterceptor) for i in self._interceptors 472 ) 473 if (not self._allow_all_consumers or any_has_requires) and not has_auth_interceptor: 474 # Prepend so capability is checked before application interceptors. 475 self._interceptors.insert(0, CapabilityInterceptor(svc_map)) 476 477 # S12.2: default-deny startup warning. 478 # When Gate 0 is disabled (allow_all_consumers=True), any service 479 # without explicit authorization is wide open. Emit a warning so 480 # the developer knows. 481 if self._allow_all_consumers: 482 for svc in self._services_in: 483 svc_cls = svc if inspect.isclass(svc) else type(svc) 484 info = getattr(svc_cls, "__aster_service_info__", None) 485 if info is None: 486 continue 487 if info.public: 488 continue 489 svc_has_auth = info.requires is not None 490 if not svc_has_auth: 491 svc_has_auth = any( 492 mi.requires is not None for mi in info.methods.values() 493 ) 494 if not svc_has_auth: 495 # Also check if user explicitly added an auth interceptor. 496 from .interceptors.auth import AuthInterceptor 497 has_explicit_auth = any( 498 isinstance(i, AuthInterceptor) for i in self._interceptors 499 ) 500 if not has_explicit_auth: 501 warnings.warn( 502 f"Service '{info.name}' has no authorization configured " 503 f"and Gate 0 is disabled (allow_all_consumers=True). " 504 f"All consumers can call this service without " 505 f"authentication. Add @service(requires=...) or " 506 f"configure an auth interceptor for production use.", 507 UserWarning, 508 stacklevel=2, 509 ) 510 511 # Server borrows a NetClient view of the node. AsterServer owns the 512 # node lifecycle, so Server must NOT close the endpoint on its own. 513 self._server = Server( 514 net_client(self._node), 515 services=self._services_in, 516 codec=self._codec, 517 interceptors=self._interceptors, 518 owns_endpoint=False, 519 peer_store=self._peer_store, 520 ) 521 522 self._print_banner() 523 self._started = True 524 525 # Always log startup info (visible even when stderr is not a TTY) 526 service_names = ", ".join(s.name for s in self._service_summaries) 527 mode = "open-gate" if self._allow_all_consumers else "trusted" 528 logger.info("server starting runtime=python services=[%s] mode=%s", service_names, mode) 529 530 def _print_banner(self) -> None: 531 """Print the startup banner with service info.""" 532 import sys 533 import os 534 535 # Only print banner when stderr is a terminal (not in tests/pipes) 536 if not sys.stderr.isatty(): 537 return 538 539 C = "\033[36m" # cyan 540 B = "\033[1m" # bold 541 D = "\033[2m" # dim 542 G = "\033[32m" # green 543 Y = "\033[33m" # yellow 544 W = "\033[37m" # white 545 R = "\033[0m" # reset 546 w = sys.stderr.write 547 548 # ── Banner ──────────────────────────────────────────────────────── 549 w(f"\n{C}{B}") 550 w(f" _ ____ _____ _____ ____\n") 551 w(f" / \\ / ___|_ _| ____| _ \\\n") 552 w(f" / _ \\ \\___ \\ | | | _| | |_) |\n") 553 w(f" / ___ \\ ___) || | | |___| _ <\n") 554 w(f" /_/ \\_\\____/ |_| |_____|_| \\_\\\n") 555 w(f"{R}\n") 556 w(f" {D}RPC after hostnames.{R}\n\n") 557 558 # ── Services table ──────────────────────────────────────────────── 559 if self._service_summaries: 560 # Find max name length for alignment 561 max_name = max(len(s.name) for s in self._service_summaries) 562 for s in self._service_summaries: 563 name_pad = s.name.ljust(max_name) 564 w(f" {G}●{R} {B}{name_pad}{R} {D}v{s.version}{R} {D}{s.contract_id}{R}\n") 565 w("\n") 566 567 # ── Endpoint ───────────────────────────────────────────────────── 568 compact = None 569 endpoint_id_full = None 570 if self._node: 571 try: 572 from . import AsterTicket 573 addr_info = self._node.node_addr_info() 574 endpoint_id_full = addr_info.endpoint_id 575 t = AsterTicket( 576 endpoint_id=addr_info.endpoint_id, 577 direct_addrs=addr_info.direct_addresses or [], 578 ) 579 compact = t.to_string() 580 except Exception: 581 pass 582 583 if endpoint_id_full: 584 short = endpoint_id_full[:16] + "…" 585 w(f" {D}node id:{R} {W}{short}{R} {D}(this node's keypair fingerprint){R}\n") 586 if compact: 587 w(f" {D}endpoint:{R} {compact}\n") 588 589 # ── Mode ────────────────────────────────────────────────────────── 590 mode_parts = [] 591 if self._allow_all_consumers: 592 mode_parts.append(f"{Y}open-gate{R}") 593 else: 594 mode_parts.append(f"{G}trusted{R}") 595 if self._registry_namespace: 596 mode_parts.append(f"{G}registry{R}") 597 w(f" {D}mode:{R} {' '.join(mode_parts)}\n") 598 599 # ── Logging ─────────────────────────────────────────────────────── 600 log_format = os.environ.get("ASTER_LOG_FORMAT", "text") 601 log_level = os.environ.get("ASTER_LOG_LEVEL", "info") 602 w(f" {D}log:{R} ASTER_LOG_FORMAT={W}{log_format}{R} ASTER_LOG_LEVEL={W}{log_level}{R}\n") 603 604 # ── Versions ────────────────────────────────────────────────────── 605 try: 606 from importlib.metadata import version as _pkg_version 607 aster_ver = _pkg_version("aster-rpc") 608 except Exception: 609 aster_ver = "?" 610 611 # Read iroh version from the native module 612 iroh_ver = "0.97" # pinned in Cargo.toml 613 614 w(f" {D}runtime:{R} aster-rpc {aster_ver} (python) iroh {iroh_ver}\n") 615 616 # ── Copyright ───────────────────────────────────────────────────── 617 w(f"\n {D}Copyright \u00a9 2026 Emrul Islam. All rights reserved.{R}\n\n") 618 619 async def _publish_contracts(self) -> None: 620 """Create a registry doc and publish each service's contract collection. 621 622 After publication, ``self._registry_namespace`` is set to the 64-char 623 hex namespace_id so consumer admission can return it. 624 """ 625 assert self._node is not None 626 627 try: 628 dc = docs_client(self._node) 629 bc = blobs_client(self._node) 630 631 # Create the registry doc (producer owns the write capability) 632 registry_doc = await dc.create() 633 author_id = await dc.create_author() 634 635 for svc in self._services_in: 636 svc_cls = svc if inspect.isclass(svc) else type(svc) 637 info = getattr(svc_cls, "__aster_service_info__", None) 638 if info is None: 639 continue 640 641 # Build the type graph and contract 642 from .contract.identity import ( 643 ServiceContract, 644 build_type_graph, 645 canonical_xlang_bytes, 646 resolve_with_cycles, 647 compute_type_hash, 648 ) 649 from .contract.publication import build_collection, upload_collection 650 from .registry.keys import contract_key, version_key 651 from .registry.models import ArtifactRef 652 653 # Collect root types 654 root_types: list[type] = [] 655 for mi in info.methods.values(): 656 if mi.request_type is not None: 657 root_types.append(mi.request_type) 658 if mi.response_type is not None: 659 root_types.append(mi.response_type) 660 661 type_graph = build_type_graph(root_types) 662 type_defs = resolve_with_cycles(type_graph) 663 664 # Compute type hashes 665 type_hashes: dict[str, bytes] = {} 666 for fqn, td in type_defs.items(): 667 td_bytes = canonical_xlang_bytes(td) 668 type_hashes[fqn] = compute_type_hash(td_bytes) 669 670 # Build ServiceContract and canonical bytes 671 contract = ServiceContract.from_service_info(info, type_hashes) 672 contract_bytes = canonical_xlang_bytes(contract) 673 674 import blake3 as _blake3 675 contract_id = _blake3.blake3(contract_bytes).hexdigest() 676 677 # Build collection with full method schemas 678 entries = build_collection(contract, type_defs, service_info=info) 679 680 # Upload to blob store as a native HashSeq collection. 681 # GC protection is handled automatically by the HashSeq tag. 682 collection_hash = await upload_collection(bc, entries) 683 684 # Create a collection ticket so consumers can download all 685 # collection blobs (index + entries) in one transfer 686 blob_ticket = bc.create_collection_ticket(collection_hash) 687 688 # Write ArtifactRef to registry doc 689 import time as _time 690 ref = ArtifactRef( 691 contract_id=contract_id, 692 collection_hash=collection_hash, 693 ticket=blob_ticket, 694 published_by=author_id, 695 published_at_epoch_ms=int(_time.time() * 1000), 696 collection_format="index", 697 ) 698 await registry_doc.set_bytes( 699 author_id, 700 contract_key(contract_id), 701 ref.to_json().encode(), 702 ) 703 704 # Also write the manifest JSON directly to the registry doc 705 # at a well-known key. This avoids the blob download round-trip 706 # for consumers that only need method schemas (like the shell). 707 manifest_data = None 708 for ename, edata in entries: 709 if ename == "manifest.json": 710 manifest_data = edata 711 break 712 if manifest_data: 713 from .registry.keys import version_key as _vk 714 manifest_key = f"manifests/{contract_id}".encode() 715 await registry_doc.set_bytes( 716 author_id, manifest_key, manifest_data 717 ) 718 719 # Version pointer 720 await registry_doc.set_bytes( 721 author_id, 722 version_key(info.name, info.version), 723 contract_id.encode(), 724 ) 725 726 logger.debug( 727 "Published contract %s for %s v%d (collection=%s)", 728 contract_id[:12], 729 info.name, 730 info.version, 731 collection_hash[:12], 732 ) 733 734 # share() enables the sync engine on the server side so consumers 735 # can replicate this doc. We only need the namespace_id on the wire. 736 await registry_doc.share_with_addr("read") 737 self._registry_namespace = registry_doc.doc_id() 738 logger.debug( 739 "Registry doc ready -- namespace: %s", self._registry_namespace[:16] 740 ) 741 742 except Exception as exc: 743 # Publication failure is non-fatal -- the server still works, 744 # consumers just won't get rich contract metadata 745 logger.warning("Contract publication failed (non-fatal): %s", exc) 746 747 def serve(self) -> asyncio.Task: 748 """Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task. 749 750 ``await server.serve()`` blocks until cancellation. The second call 751 returns the same task (idempotent). 752 """ 753 if self._serve_task is not None: 754 return self._serve_task 755 if not self._started: 756 raise RuntimeError("AsterServer.serve() called before start()") 757 assert self._server is not None 758 assert self._node is not None 759 760 subtasks: list[asyncio.Task] = [] 761 762 # Gate 0 hook loop: drain the after-handshake channel, apply the 763 # MeshEndpointHook allowlist for every connection. before_connect is 764 # auto-accepted inside NodeHookReceiver (the peer's endpoint ID 765 # isn't authenticated at that stage). 766 if self._hook is not None and self._node.has_hooks(): 767 self._hook_loop_task = asyncio.create_task( 768 self._run_gate0(), name="aster-gate0" 769 ) 770 subtasks.append(self._hook_loop_task) 771 772 self._peer_store.start_reaper() 773 774 subtasks.append( 775 asyncio.create_task(self._accept_loop(), name="aster-accept") 776 ) 777 778 # Delegated admission loop: accept connections on aster.admission 779 # ALPN for @aster-issued enrollment tokens. 780 if self._delegation_policies: 781 subtasks.append( 782 asyncio.create_task( 783 self._delegated_admission_loop(), name="aster-delegated-admission" 784 ) 785 ) 786 787 # Auto-register endpoints with @aster for published services. 788 # Requires producer service tokens (from `aster publish`). 789 if self._producer_tokens: 790 subtasks.append( 791 asyncio.create_task( 792 self._aster_registration_loop(), name="aster-registration" 793 ) 794 ) 795 796 self._subtasks = subtasks 797 798 async def _wait_all() -> None: 799 try: 800 await asyncio.gather(*subtasks, return_exceptions=True) 801 except asyncio.CancelledError: 802 # Graceful shutdown on Ctrl+C / task cancellation 803 logger.info("Server shutting down...") 804 for t in subtasks: 805 t.cancel() 806 await asyncio.gather(*subtasks, return_exceptions=True) 807 808 self._serve_task = asyncio.create_task(_wait_all(), name="aster-server-serve") 809 return self._serve_task 810 811 async def _run_gate0(self) -> None: 812 """Take the hook receiver from the node and run the hook loop.""" 813 assert self._node is not None 814 assert self._hook is not None 815 receiver = await self._node.take_hook_receiver() 816 if receiver is None: 817 logger.warning("AsterServer: hooks enabled but no receiver available") 818 return 819 await self._hook.run_hook_loop(receiver) 820 821 async def _accept_loop(self) -> None: 822 """Pull from ``node.accept_aster()`` and dispatch per ALPN.""" 823 assert self._node is not None 824 assert self._server is not None 825 services_snapshot = list(self._service_summaries) 826 try: 827 while True: 828 try: 829 alpn, conn = await self._node.accept_aster() 830 except asyncio.CancelledError: 831 raise 832 except Exception as exc: # noqa: BLE001 833 logger.warning("AsterServer: accept_aster failed: %s", exc) 834 continue 835 836 if alpn == RPC_ALPN: 837 asyncio.create_task( 838 self._server.handle_connection(conn), 839 name="aster-rpc-conn", 840 ) 841 elif alpn == ALPN_CONSUMER_ADMISSION: 842 # Always handle consumer admission -- even when 843 # allow_all_consumers=True the consumer needs the 844 # services list from the admission response. 845 asyncio.create_task( 846 handle_consumer_admission_connection( 847 conn, 848 root_pubkey=self._root_pubkey or b"\x00" * 32, 849 hook=self._hook, 850 nonce_store=self._nonce_store, 851 services_getter=lambda: services_snapshot, 852 registry_namespace_getter=lambda: self._registry_namespace, 853 allow_unenrolled=self._allow_all_consumers, 854 peer_store=self._peer_store, 855 gossip_topic_getter=lambda: ( 856 self._mesh_state.topic_id 857 if self._mesh_state else None 858 ), 859 ), 860 name="aster-consumer-admission-conn", 861 ) 862 elif alpn == ALPN_PRODUCER_ADMISSION and not self._allow_all_producers: 863 assert self._root_pubkey is not None 864 assert self._mesh_state is not None 865 asyncio.create_task( 866 handle_producer_admission_connection( 867 conn, 868 own_root_pubkey=self._root_pubkey, 869 own_state=self._mesh_state, 870 config=self._clock_drift_config, 871 persist_state=self._persist_mesh_state, 872 ), 873 name="aster-producer-admission-conn", 874 ) 875 else: 876 try: 877 conn.close(0, b"unexpected alpn") 878 except Exception: # noqa: BLE001 879 pass 880 except asyncio.CancelledError: 881 pass 882 883 async def drain(self, grace_period: float = 10.0) -> None: 884 """Graceful shutdown: stop accepting new connections, drain existing ones. 885 886 Compatible with Kubernetes ``preStop`` hooks and SIGTERM handling. 887 After drain completes, call ``close()`` to shut down the node. 888 889 Args: 890 grace_period: Seconds to wait for in-flight requests to complete. 891 """ 892 logger.info("Draining server (grace_period=%.1fs)...", grace_period) 893 if self._server is not None: 894 await self._server.drain(grace_period) 895 logger.info("Drain complete") 896 897 async def close(self) -> None: 898 """Cancel serve loops and close the node. Safe to call multiple times.""" 899 if self._closed: 900 return 901 self._closed = True 902 903 for t in self._subtasks: 904 t.cancel() 905 if self._serve_task is not None: 906 self._serve_task.cancel() 907 try: 908 await self._serve_task 909 except (asyncio.CancelledError, Exception): 910 pass 911 912 # Close the node -- this triggers router.shutdown() which closes all 913 # protocol handlers (including aster queue handlers) and the endpoint. 914 if self._node is not None: 915 try: 916 await self._node.close() 917 except Exception: 918 pass 919 920 # ── @aster endpoint registration ────────────────────────────────────── 921 922 def _load_producer_tokens(self) -> None: 923 """Load producer service tokens from .aster-identity [published_services.*].""" 924 _, peer_entry = self._config.load_identity( 925 peer_name=self._peer_name, role="producer" 926 ) 927 if not peer_entry: 928 return 929 930 # The identity loader returns the raw peer dict. Published services 931 # are stored as [published_services.<ServiceName>] sections in the 932 # .aster-identity TOML file. 933 published = peer_entry.get("published_services", {}) 934 if not isinstance(published, dict): 935 return 936 937 for svc_name, token_data in published.items(): 938 if isinstance(token_data, dict) and token_data.get("producer_token"): 939 self._producer_tokens[svc_name] = token_data 940 logger.debug("Loaded producer token for %s", svc_name) 941 942 def _load_delegation_policies(self) -> None: 943 """Build DelegatedAdmissionPolicy for each published service with aster_root_pubkey.""" 944 from aster.trust.delegated import DelegatedAdmissionPolicy 945 946 _, peer_entry = self._config.load_identity( 947 peer_name=self._peer_name, role="producer" 948 ) 949 if not peer_entry: 950 return 951 952 published = peer_entry.get("published_services", {}) 953 if not isinstance(published, dict): 954 return 955 956 # Match published services to the services we're hosting 957 for svc_name, pub_data in published.items(): 958 if not isinstance(pub_data, dict): 959 continue 960 aster_root_pubkey = pub_data.get("aster_root_pubkey", "") 961 contract_id = pub_data.get("contract_id", "") 962 handle = pub_data.get("handle", peer_entry.get("handle", "")) 963 if aster_root_pubkey and contract_id: 964 self._delegation_policies[svc_name] = DelegatedAdmissionPolicy( 965 target_handle=handle, 966 target_service=svc_name, 967 target_contract_id=contract_id, 968 aster_root_pubkey=aster_root_pubkey, 969 ) 970 logger.debug("Delegation policy loaded for %s", svc_name) 971 972 async def _delegated_admission_loop(self) -> None: 973 """Accept connections on aster.admission ALPN and verify delegated tokens.""" 974 from aster.trust.delegated import handle_delegated_admission_connection 975 976 assert self._node is not None 977 ALPN_DELEGATED = b"aster.admission" 978 979 try: 980 while not self._closed: 981 try: 982 conn = await self._node.accept_aster(ALPN_DELEGATED) 983 except asyncio.CancelledError: 984 return 985 except Exception as exc: 986 logger.debug("delegated admission accept error: %s", exc) 987 continue 988 989 # Determine which policy applies based on connection metadata 990 # For now, use the first available policy (single-service producer) 991 # or match by peer request content 992 policy = next(iter(self._delegation_policies.values()), None) 993 if policy is None: 994 logger.warning("delegated admission: no policy configured") 995 continue 996 997 asyncio.create_task( 998 handle_delegated_admission_connection( 999 conn, 1000 policy=policy, 1001 hook=self._hook, 1002 peer_store=self._peer_store, 1003 ) 1004 ) 1005 except asyncio.CancelledError: 1006 pass 1007 1008 async def _aster_registration_loop(self) -> None: 1009 """Background loop: register endpoints with @aster for published services. 1010 1011 Connects to @aster, registers each service's endpoint, then 1012 re-registers periodically before TTL expiry. 1013 """ 1014 import json as _json 1015 1016 # Wait a moment for the server to be fully ready 1017 await asyncio.sleep(2) 1018 1019 ttl = 300 # 5 minutes 1020 interval = ttl * 0.75 # re-register at 75% of TTL 1021 1022 while not self._closed: 1023 try: 1024 await self._register_endpoints_with_aster(ttl) 1025 except asyncio.CancelledError: 1026 return 1027 except Exception as exc: 1028 logger.warning("@aster registration failed: %s", exc) 1029 1030 # Wait before re-registering 1031 try: 1032 await asyncio.sleep(interval) 1033 except asyncio.CancelledError: 1034 return 1035 1036 async def _register_endpoints_with_aster(self, ttl: int) -> None: 1037 """One-shot registration of all published service endpoints.""" 1038 if not self._node or not self._producer_tokens: 1039 return 1040 1041 # Build our endpoint info 1042 addr_info = self._node.node_addr_info() 1043 node_id = addr_info.endpoint_id 1044 relay = addr_info.relay_url or "" 1045 direct_addrs = addr_info.direct_addresses or [] 1046 1047 # Resolve @aster address from identity file or profile config. 1048 # The token itself contains the root_pubkey which can be used 1049 # to discover @aster via DNS TXT record in production. 1050 aster_addr = self._resolve_aster_address() 1051 if not aster_addr: 1052 logger.debug("No @aster address configured -- skipping registration") 1053 return 1054 1055 aster_client = AsterClient(address=aster_addr) 1056 1057 try: 1058 await aster_client.connect() 1059 except Exception as exc: 1060 logger.debug("Could not connect to @aster: %s", exc) 1061 return 1062 1063 try: 1064 # For each published service with a token, call register_endpoint 1065 for svc_name, token in self._producer_tokens.items(): 1066 try: 1067 # Use the dynamic invoke path -- we don't have generated 1068 # types for @aster's PublicationService 1069 import json as _json 1070 request = { 1071 "producer_token": _json.dumps(token), 1072 "node_id": node_id, 1073 "relay": relay, 1074 "direct_addrs": direct_addrs, 1075 "ttl": ttl, 1076 } 1077 1078 # Invoke register_endpoint on PublicationService 1079 conn = await aster_client._rpc_conn_for( 1080 next( 1081 (s.channels.get("rpc", "") for s in aster_client._services 1082 if s.name == "PublicationService"), 1083 "" 1084 ) 1085 ) 1086 from .transport.iroh import IrohTransport 1087 transport = IrohTransport(conn, codec=self._codec) 1088 resp = await transport.unary( 1089 "PublicationService", "register_endpoint", request 1090 ) 1091 logger.info( 1092 "Registered endpoint with @aster: %s (%s)", 1093 svc_name, node_id[:12], 1094 ) 1095 except Exception as exc: 1096 logger.warning( 1097 "Failed to register %s with @aster: %s", 1098 svc_name, exc, 1099 ) 1100 finally: 1101 await aster_client.close() 1102 1103 def _resolve_aster_address(self) -> str | None: 1104 """Resolve the @aster service address for endpoint registration. 1105 1106 Checks (in order): 1107 1. ASTER_SERVICE_ADDRESS env var 1108 2. aster_service.address in the identity file's peer entry 1109 3. DNS TXT record on aster.site (future) 1110 """ 1111 # Env var override 1112 addr = os.environ.get("ASTER_SERVICE_ADDRESS", "") 1113 if addr: 1114 return addr 1115 1116 # Identity file -- the peer entry may have aster_service config 1117 _, peer_entry = self._config.load_identity( 1118 peer_name=self._peer_name, role="producer" 1119 ) 1120 if peer_entry: 1121 addr = peer_entry.get("aster_service", "") 1122 if addr: 1123 return addr 1124 1125 return None 1126 1127 def _install_signal_handlers(self, grace_period: float = 10.0) -> None: 1128 """Install SIGTERM/SIGINT handlers for graceful shutdown. 1129 1130 Call after ``serve()`` to enable k8s-compatible shutdown: 1131 1132 - SIGTERM: drain → close (graceful) 1133 - SIGINT (Ctrl+C): drain → close (graceful) 1134 - Second SIGINT: immediate exit 1135 1136 Usage:: 1137 1138 async with AsterServer(services=[...]) as srv: 1139 srv.install_signal_handlers() 1140 await srv.serve() 1141 """ 1142 import signal 1143 1144 loop = asyncio.get_event_loop() 1145 shutdown_count = 0 1146 1147 def _handle_signal(sig: int, frame: Any) -> None: 1148 nonlocal shutdown_count 1149 shutdown_count += 1 1150 if shutdown_count > 1: 1151 logger.warning("Forced exit (second signal)") 1152 sys.exit(1) 1153 logger.info("Received %s -- draining...", signal.Signals(sig).name) 1154 loop.create_task(self._graceful_shutdown(grace_period)) 1155 1156 signal.signal(signal.SIGTERM, _handle_signal) 1157 signal.signal(signal.SIGINT, _handle_signal) 1158 1159 async def _graceful_shutdown(self, grace_period: float) -> None: 1160 """Internal: drain then close.""" 1161 try: 1162 await self.drain(grace_period) 1163 finally: 1164 await self.close() 1165 1166 async def __aenter__(self) -> "AsterServer": 1167 await self.start() 1168 self.serve() 1169 return self 1170 1171 async def __aexit__(self, exc_type, exc, tb) -> None: 1172 await self.close() 1173 1174 # ── Properties ─────────────────────────────────────────────────────────── 1175 1176 @property 1177 def address(self) -> str: 1178 """Connection address for this server (``aster1...`` ticket). 1179 1180 Pass this to ``AsterClient(address=...)`` or ``aster shell`` 1181 to connect. Includes relay address (if available) and direct 1182 addresses for LAN connectivity. 1183 """ 1184 self._require_started() 1185 assert self._node is not None 1186 from . import AsterTicket 1187 addr_info = self._node.node_addr_info() 1188 1189 # Resolve relay URL to IP:port for the ticket 1190 relay_addr = None 1191 if addr_info.relay_url: 1192 relay_addr = _resolve_relay_addr(addr_info.relay_url) 1193 1194 t = AsterTicket( 1195 endpoint_id=addr_info.endpoint_id, 1196 relay_addr=relay_addr, 1197 direct_addrs=addr_info.direct_addresses or [], 1198 ) 1199 return t.to_string() 1200 1201 @property 1202 def endpoint_id(self) -> str: 1203 """Hex endpoint ID of this server's node.""" 1204 self._require_started() 1205 assert self._node is not None 1206 return self._node.node_id() 1207 1208 # ── Back-compat aliases ────────────────────────────────────────────── 1209 1210 @property 1211 def _ticket(self) -> str: 1212 """Alias for :attr:`address` (internal back-compat).""" 1213 return self.address 1214 1215 # Back-compat alias -- used in tests 1216 @property 1217 def endpoint_addr_b64(self) -> str: 1218 self._require_started() 1219 assert self._node is not None 1220 return base64.b64encode(self._node.node_addr_info().to_bytes()).decode() 1221 1222 @property 1223 def _rpc_addr_b64(self) -> str: 1224 return self.endpoint_addr_b64 1225 1226 @property 1227 def _admission_addr_b64(self) -> str | None: 1228 if self._allow_all_consumers and self._allow_all_producers: 1229 return None 1230 return self.endpoint_addr_b64 1231 1232 @property 1233 def _consumer_admission_addr_b64(self) -> str | None: 1234 if self._allow_all_consumers: 1235 return None 1236 return self.endpoint_addr_b64 1237 1238 @property 1239 def _producer_admission_addr_b64(self) -> str | None: 1240 if self._allow_all_producers: 1241 return None 1242 return self.endpoint_addr_b64 1243 1244 @property 1245 def services(self) -> list[ServiceSummary]: 1246 """List of services hosted by this server.""" 1247 self._require_started() 1248 return list(self._service_summaries) 1249 1250 @property 1251 def root_pubkey(self) -> bytes | None: 1252 """The 32-byte ed25519 trust anchor public key, or ``None``.""" 1253 return self._root_pubkey 1254 1255 # ── Iroh protocol clients (lazy) ───────────────────────────────────────── 1256 1257 @property 1258 def node(self) -> IrohNode: 1259 """The underlying ``IrohNode`` (escape hatch for direct iroh access).""" 1260 self._require_started() 1261 assert self._node is not None 1262 return self._node 1263 1264 @property 1265 def blobs(self) -> Any: 1266 """Blobs client backed by this node.""" 1267 self._require_started() 1268 if self._blobs is None: 1269 self._blobs = blobs_client(self._node) 1270 return self._blobs 1271 1272 @property 1273 def docs(self) -> Any: 1274 """Docs client backed by this node.""" 1275 self._require_started() 1276 if self._docs is None: 1277 self._docs = docs_client(self._node) 1278 return self._docs 1279 1280 @property 1281 def gossip(self) -> Any: 1282 """Gossip client backed by this node.""" 1283 self._require_started() 1284 if self._gossip is None: 1285 self._gossip = gossip_client(self._node) 1286 return self._gossip 1287 1288 # Back-compat aliases 1289 @property 1290 def endpoint(self) -> Any: 1291 """Escape hatch: the ``NetClient`` view of this node's endpoint.""" 1292 self._require_started() 1293 return net_client(self._node) 1294 1295 @property 1296 def _rpc_endpoint(self) -> Any: 1297 return self.endpoint 1298 1299 def _require_started(self) -> None: 1300 if not self._started: 1301 raise RuntimeError("AsterServer not started; call start() first")
High-level, declarative producer.
Builds a single IrohNode that serves blobs + docs + gossip
(iroh built-in protocols) alongside aster RPC (aster/1) and any
enabled admission ALPNs -- all on one endpoint, one node ID.
When any admission gate is active (allow_all_consumers=False or
allow_all_producers=False), the node is built with
enable_hooks=True and a background task runs the Gate 0
connection-level hook loop (MeshEndpointHook.run_hook_loop), which
gates all protocols (blobs, docs, gossip, aster/1, admission) at the
QUIC handshake layer.
165 def __init__( 166 self, 167 services: list, 168 *, 169 config: "AsterConfig | None" = None, 170 peer: str | None = None, 171 identity: str | None = None, 172 # Inline overrides (take priority over config): 173 root_pubkey: bytes | None = None, 174 allow_all_consumers: bool | None = None, 175 allow_all_producers: bool | None = None, 176 endpoint_config: EndpointConfig | None = None, 177 # Internal wiring: 178 channel_name: str = "rpc", 179 codec: Any | None = None, 180 interceptors: list[Any] | None = None, 181 hook: MeshEndpointHook | None = None, 182 nonce_store: Any | None = None, 183 registry_namespace: str = "", 184 mesh_state: MeshState | None = None, 185 clock_drift_config: ClockDriftConfig | None = None, 186 persist_mesh_state: bool = False, 187 ) -> None: 188 """Create an Aster RPC server. 189 190 .. note:: **Interceptors are not wired by default.** The server ships 191 with interceptors for rate limiting, deadline enforcement, auth, 192 capability checks, circuit breaking, metrics, audit logging, and 193 retry hints -- but none are active unless you pass them via the 194 ``interceptors`` parameter. For production use, wire at minimum 195 ``DeadlineInterceptor`` and ``RateLimitInterceptor``. All 196 interceptors live in ``aster.interceptors``. 197 198 Args: 199 services: List of ``@service``-decorated class instances to serve. 200 At least one is required. 201 config: Optional :class:`AsterConfig` for trust, storage, and 202 networking settings. If omitted, settings are loaded from 203 environment variables and defaults. 204 peer: Optional peer name for this server (used in config lookup 205 and identity file resolution). 206 identity: Path to ``.aster-identity`` file (default: auto-detected 207 from CWD). Overrides ``config.identity_file``. 208 root_pubkey: 32-byte ed25519 public key for the trust anchor. 209 Overrides ``config.root_pubkey`` if both are set. 210 allow_all_consumers: If ``True``, skip consumer admission 211 (open gate). Overrides ``config.allow_all_consumers``. 212 allow_all_producers: If ``True``, skip producer admission. 213 Overrides ``config.allow_all_producers``. 214 endpoint_config: Low-level iroh endpoint configuration. 215 216 Example:: 217 218 @service(name="MyService", version=1) 219 class MyService: 220 @rpc() 221 async def hello(self, req: HelloRequest) -> HelloResponse: 222 return HelloResponse(message=f"Hello {req.name}") 223 224 async with AsterServer(services=[MyService()]) as srv: 225 print(srv.address) 226 await srv.serve() 227 """ 228 if not services: 229 raise ValueError("AsterServer requires at least one service") 230 231 # Auto-load config from env if none provided. 232 from .config import AsterConfig 233 if config is None: 234 config = AsterConfig.from_env() 235 if identity is not None: 236 config.identity_file = identity 237 self._config = config 238 239 # Inline overrides win over config. 240 self._allow_all_consumers = ( 241 allow_all_consumers if allow_all_consumers is not None 242 else config.allow_all_consumers 243 ) 244 self._allow_all_producers = ( 245 allow_all_producers if allow_all_producers is not None 246 else config.allow_all_producers 247 ) 248 249 # Load identity file if present (.aster-identity). 250 self._peer_name = peer 251 secret_key_from_identity, peer_entry = config.load_identity( 252 peer_name=peer, role="producer" 253 ) 254 if peer_entry and not root_pubkey: 255 # Root pubkey comes from the credential in the identity file. 256 root_pubkey = bytes.fromhex(peer_entry["root_pubkey"]) 257 if secret_key_from_identity and not config.secret_key: 258 import base64 as _b64 259 config.secret_key = secret_key_from_identity 260 261 # Resolve root public key: inline > identity file > config file > ephemeral. 262 # The root private key is NEVER on a running node (trust spec §1.1). 263 pub = config.resolve_root_pubkey() 264 self._root_pubkey = root_pubkey if root_pubkey is not None else pub 265 266 # Dev mode: if using an ephemeral root key (no explicit pubkey file), 267 # auto-open the consumer gate so the quickstart works without 268 # credential files. In production (explicit root_pubkey_file), 269 # the default allow_all_consumers=False requires credentials. 270 if ( 271 config._ephemeral_privkey is not None 272 and allow_all_consumers is None 273 and config.root_pubkey_file is None 274 ): 275 self._allow_all_consumers = True 276 logger.info( 277 "Dev mode: allow_all_consumers=True (ephemeral root key). " 278 "Set ASTER_ROOT_PUBKEY_FILE for production admission." 279 ) 280 281 if (not self._allow_all_consumers or not self._allow_all_producers) and self._root_pubkey is None: 282 raise ValueError( 283 "root_pubkey is required when admission is enabled " 284 "(allow_all_consumers=False or allow_all_producers=False). " 285 "Set ASTER_ROOT_PUBKEY_FILE or pass root_pubkey= explicitly." 286 ) 287 288 self._services_in: list = list(services) 289 self._endpoint_config_template = endpoint_config or config.to_endpoint_config() 290 self._channel_name = channel_name 291 self._codec = codec 292 from aster.interceptors.deadline import DeadlineInterceptor 293 if interceptors is not None: 294 self._interceptors = list(interceptors) 295 else: 296 self._interceptors = [DeadlineInterceptor()] 297 self._hook = hook 298 self._nonce_store = nonce_store 299 300 # Admission → dispatch bridge: stores per-peer attributes 301 from aster.peer_store import PeerAttributeStore 302 self._peer_store = PeerAttributeStore() 303 self._registry_namespace = registry_namespace 304 self._mesh_state = mesh_state 305 self._clock_drift_config = clock_drift_config 306 self._persist_mesh_state = persist_mesh_state 307 308 # Populated by start() 309 self._started: bool = False 310 self._node: IrohNode | None = None 311 self._service_summaries: list[ServiceSummary] = [] 312 self._server: Server | None = None 313 # Lazy caches for .blobs / .docs / .gossip 314 self._blobs: Any | None = None 315 self._docs: Any | None = None 316 self._gossip: Any | None = None 317 318 # Populated by serve() 319 self._serve_task: asyncio.Task | None = None 320 self._subtasks: list[asyncio.Task] = [] 321 self._closed: bool = False 322 323 # Producer service tokens for @aster endpoint registration. 324 self._producer_tokens: dict[str, dict] = {} # service_name -> token dict 325 self._load_producer_tokens() 326 327 # Delegation policies for aster.admission ALPN. 328 # Built from published_services entries that have aster_root_pubkey. 329 self._delegation_policies: dict[str, Any] = {} # service_name -> policy 330 self._load_delegation_policies()
Create an Aster RPC server.
Interceptors are not wired by default. The server ships
with interceptors for rate limiting, deadline enforcement, auth,
capability checks, circuit breaking, metrics, audit logging, and
retry hints -- but none are active unless you pass them via the
interceptors parameter. For production use, wire at minimum
DeadlineInterceptor and RateLimitInterceptor. All
interceptors live in aster.interceptors.
Args:
services: List of @service-decorated class instances to serve.
At least one is required.
config: Optional AsterConfig for trust, storage, and
networking settings. If omitted, settings are loaded from
environment variables and defaults.
peer: Optional peer name for this server (used in config lookup
and identity file resolution).
identity: Path to .aster-identity file (default: auto-detected
from CWD). Overrides config.identity_file.
root_pubkey: 32-byte ed25519 public key for the trust anchor.
Overrides config.root_pubkey if both are set.
allow_all_consumers: If True, skip consumer admission
(open gate). Overrides config.allow_all_consumers.
allow_all_producers: If True, skip producer admission.
Overrides config.allow_all_producers.
endpoint_config: Low-level iroh endpoint configuration.
Example::
@service(name="MyService", version=1)
class MyService:
@rpc()
async def hello(self, req: HelloRequest) -> HelloResponse:
return HelloResponse(message=f"Hello {req.name}")
async with AsterServer(services=[MyService()]) as srv:
print(srv.address)
await srv.serve()
334 async def start(self) -> None: 335 """Create the unified node and compute ``ServiceSummary`` list. Idempotent.""" 336 if self._started: 337 return 338 339 # Configure structured logging from config (idempotent) 340 from aster.logging import configure_logging 341 configure_logging( 342 format=self._config.log_format, 343 level=self._config.log_level, 344 mask=self._config.log_mask, 345 ) 346 347 # Determine which aster ALPNs to register on the Router. 348 # Consumer admission is ALWAYS registered -- even in open-gate mode 349 # the consumer uses it to discover services. 350 aster_alpns: list[bytes] = [RPC_ALPN, ALPN_CONSUMER_ADMISSION] 351 gate0_needed = False 352 if not self._allow_all_consumers: 353 if self._hook is None: 354 self._hook = MeshEndpointHook(peer_store=self._peer_store) 355 if self._nonce_store is None: 356 self._nonce_store = InMemoryNonceStore() 357 gate0_needed = True 358 if not self._allow_all_producers: 359 aster_alpns.append(ALPN_PRODUCER_ADMISSION) 360 if self._hook is None: 361 self._hook = MeshEndpointHook(peer_store=self._peer_store) 362 gate0_needed = True 363 364 # Build EndpointConfig so hooks (Gate 0) are installed when needed. 365 ep_cfg = _build_node_endpoint_config( 366 self._endpoint_config_template, enable_hooks=gate0_needed 367 ) 368 369 self._node = await IrohNode.memory_with_alpns(aster_alpns, ep_cfg) 370 addr_b64 = base64.b64encode( 371 self._node.node_addr_info().to_bytes() 372 ).decode() 373 374 # Auto-create ephemeral MeshState. Even when allow_all_producers=True 375 # we need the topic_id so the root node's shell can observe gossip. 376 if self._mesh_state is None and self._root_pubkey is not None: 377 self._mesh_state = make_ephemeral_mesh_state(self._root_pubkey) 378 379 # Build ServiceSummary list with per-spec contract_id. 380 summaries: list[ServiceSummary] = [] 381 for svc in self._services_in: 382 svc_cls = svc if inspect.isclass(svc) else type(svc) 383 info = getattr(svc_cls, "__aster_service_info__", None) 384 if info is None: 385 raise TypeError( 386 f"{svc_cls!r} is not @service-decorated " 387 f"(missing __aster_service_info__)" 388 ) 389 cid = contract_id_from_service(svc_cls) 390 summaries.append( 391 ServiceSummary( 392 name=info.name, 393 version=info.version, 394 contract_id=cid, 395 channels={self._channel_name: addr_b64}, 396 pattern=info.scoped or "shared", 397 ) 398 ) 399 self._service_summaries = summaries 400 401 # ── C11.4.3: Startup publication verification ──────────────────── 402 # If .aster/manifest.json exists, verify that each service's live 403 # contract_id matches the committed manifest. Fatal on mismatch. 404 manifest_path = os.path.join(os.getcwd(), ".aster", "manifest.json") 405 if os.path.isfile(manifest_path): 406 import json as _json 407 from .contract.manifest import ContractManifest 408 409 try: 410 with open(manifest_path, encoding="utf-8") as f: 411 manifest_data = _json.load(f) 412 except Exception as exc: 413 raise RuntimeError( 414 f"Failed to read manifest at {manifest_path}: {exc}" 415 ) from exc 416 417 # Support both single manifest and list of manifests 418 manifests_list = manifest_data if isinstance(manifest_data, list) else [manifest_data] 419 manifest_by_key: dict[tuple[str, int], ContractManifest] = {} 420 for md in manifests_list: 421 m = ContractManifest(**md) 422 manifest_by_key[(m.service, m.version)] = m 423 424 for summary in summaries: 425 key = (summary.name, summary.version) 426 manifest = manifest_by_key.get(key) 427 if manifest is None: 428 continue # Service not in manifest -- skip 429 if summary.contract_id != manifest.contract_id: 430 raise RuntimeError( 431 f"Contract identity mismatch for {summary.name!r} " 432 f"v{summary.version}:\n" 433 f" Expected (manifest): {manifest.contract_id}\n" 434 f" Actual (live): {summary.contract_id}\n" 435 f" Manifest: {manifest_path}\n" 436 f" -> The service interface has changed without " 437 f"updating the manifest.\n" 438 f" Rerun `aster contract gen` and commit the " 439 f"updated manifest." 440 ) 441 logger.debug("Manifest verification passed for %d service(s)", len(manifest_by_key)) 442 443 # ── Contract publication to registry doc ───────────────────────── 444 # Create a registry doc, publish contract collections (with manifest 445 # including method schemas), and generate a read-only share ticket 446 # so consumers can discover full contract metadata. 447 await self._publish_contracts() 448 449 # ── Gate 3: capability interceptor & default-deny warning ──────── 450 # 451 # Build a service_name -> ServiceInfo map for the CapabilityInterceptor 452 # and check whether any service has authorization configured. 453 from .interceptors.capability import CapabilityInterceptor 454 455 svc_map: dict[str, Any] = {} 456 any_has_requires = False 457 for svc in self._services_in: 458 svc_cls = svc if inspect.isclass(svc) else type(svc) 459 info = getattr(svc_cls, "__aster_service_info__", None) 460 if info is not None: 461 svc_map[info.name] = info 462 if info.requires is not None: 463 any_has_requires = True 464 for mi in info.methods.values(): 465 if mi.requires is not None: 466 any_has_requires = True 467 468 # Auto-wire the CapabilityInterceptor when trust is configured 469 # (Gate 0 enabled) or when any service declares requires. 470 has_auth_interceptor = any( 471 isinstance(i, CapabilityInterceptor) for i in self._interceptors 472 ) 473 if (not self._allow_all_consumers or any_has_requires) and not has_auth_interceptor: 474 # Prepend so capability is checked before application interceptors. 475 self._interceptors.insert(0, CapabilityInterceptor(svc_map)) 476 477 # S12.2: default-deny startup warning. 478 # When Gate 0 is disabled (allow_all_consumers=True), any service 479 # without explicit authorization is wide open. Emit a warning so 480 # the developer knows. 481 if self._allow_all_consumers: 482 for svc in self._services_in: 483 svc_cls = svc if inspect.isclass(svc) else type(svc) 484 info = getattr(svc_cls, "__aster_service_info__", None) 485 if info is None: 486 continue 487 if info.public: 488 continue 489 svc_has_auth = info.requires is not None 490 if not svc_has_auth: 491 svc_has_auth = any( 492 mi.requires is not None for mi in info.methods.values() 493 ) 494 if not svc_has_auth: 495 # Also check if user explicitly added an auth interceptor. 496 from .interceptors.auth import AuthInterceptor 497 has_explicit_auth = any( 498 isinstance(i, AuthInterceptor) for i in self._interceptors 499 ) 500 if not has_explicit_auth: 501 warnings.warn( 502 f"Service '{info.name}' has no authorization configured " 503 f"and Gate 0 is disabled (allow_all_consumers=True). " 504 f"All consumers can call this service without " 505 f"authentication. Add @service(requires=...) or " 506 f"configure an auth interceptor for production use.", 507 UserWarning, 508 stacklevel=2, 509 ) 510 511 # Server borrows a NetClient view of the node. AsterServer owns the 512 # node lifecycle, so Server must NOT close the endpoint on its own. 513 self._server = Server( 514 net_client(self._node), 515 services=self._services_in, 516 codec=self._codec, 517 interceptors=self._interceptors, 518 owns_endpoint=False, 519 peer_store=self._peer_store, 520 ) 521 522 self._print_banner() 523 self._started = True 524 525 # Always log startup info (visible even when stderr is not a TTY) 526 service_names = ", ".join(s.name for s in self._service_summaries) 527 mode = "open-gate" if self._allow_all_consumers else "trusted" 528 logger.info("server starting runtime=python services=[%s] mode=%s", service_names, mode)
Create the unified node and compute ServiceSummary list. Idempotent.
747 def serve(self) -> asyncio.Task: 748 """Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task. 749 750 ``await server.serve()`` blocks until cancellation. The second call 751 returns the same task (idempotent). 752 """ 753 if self._serve_task is not None: 754 return self._serve_task 755 if not self._started: 756 raise RuntimeError("AsterServer.serve() called before start()") 757 assert self._server is not None 758 assert self._node is not None 759 760 subtasks: list[asyncio.Task] = [] 761 762 # Gate 0 hook loop: drain the after-handshake channel, apply the 763 # MeshEndpointHook allowlist for every connection. before_connect is 764 # auto-accepted inside NodeHookReceiver (the peer's endpoint ID 765 # isn't authenticated at that stage). 766 if self._hook is not None and self._node.has_hooks(): 767 self._hook_loop_task = asyncio.create_task( 768 self._run_gate0(), name="aster-gate0" 769 ) 770 subtasks.append(self._hook_loop_task) 771 772 self._peer_store.start_reaper() 773 774 subtasks.append( 775 asyncio.create_task(self._accept_loop(), name="aster-accept") 776 ) 777 778 # Delegated admission loop: accept connections on aster.admission 779 # ALPN for @aster-issued enrollment tokens. 780 if self._delegation_policies: 781 subtasks.append( 782 asyncio.create_task( 783 self._delegated_admission_loop(), name="aster-delegated-admission" 784 ) 785 ) 786 787 # Auto-register endpoints with @aster for published services. 788 # Requires producer service tokens (from `aster publish`). 789 if self._producer_tokens: 790 subtasks.append( 791 asyncio.create_task( 792 self._aster_registration_loop(), name="aster-registration" 793 ) 794 ) 795 796 self._subtasks = subtasks 797 798 async def _wait_all() -> None: 799 try: 800 await asyncio.gather(*subtasks, return_exceptions=True) 801 except asyncio.CancelledError: 802 # Graceful shutdown on Ctrl+C / task cancellation 803 logger.info("Server shutting down...") 804 for t in subtasks: 805 t.cancel() 806 await asyncio.gather(*subtasks, return_exceptions=True) 807 808 self._serve_task = asyncio.create_task(_wait_all(), name="aster-server-serve") 809 return self._serve_task
Spawn the accept loop (+ Gate 0 hook loop); return an aggregate task.
await server.serve() blocks until cancellation. The second call
returns the same task (idempotent).
883 async def drain(self, grace_period: float = 10.0) -> None: 884 """Graceful shutdown: stop accepting new connections, drain existing ones. 885 886 Compatible with Kubernetes ``preStop`` hooks and SIGTERM handling. 887 After drain completes, call ``close()`` to shut down the node. 888 889 Args: 890 grace_period: Seconds to wait for in-flight requests to complete. 891 """ 892 logger.info("Draining server (grace_period=%.1fs)...", grace_period) 893 if self._server is not None: 894 await self._server.drain(grace_period) 895 logger.info("Drain complete")
Graceful shutdown: stop accepting new connections, drain existing ones.
Compatible with Kubernetes preStop hooks and SIGTERM handling.
After drain completes, call close() to shut down the node.
Args: grace_period: Seconds to wait for in-flight requests to complete.
897 async def close(self) -> None: 898 """Cancel serve loops and close the node. Safe to call multiple times.""" 899 if self._closed: 900 return 901 self._closed = True 902 903 for t in self._subtasks: 904 t.cancel() 905 if self._serve_task is not None: 906 self._serve_task.cancel() 907 try: 908 await self._serve_task 909 except (asyncio.CancelledError, Exception): 910 pass 911 912 # Close the node -- this triggers router.shutdown() which closes all 913 # protocol handlers (including aster queue handlers) and the endpoint. 914 if self._node is not None: 915 try: 916 await self._node.close() 917 except Exception: 918 pass
Cancel serve loops and close the node. Safe to call multiple times.
1176 @property 1177 def address(self) -> str: 1178 """Connection address for this server (``aster1...`` ticket). 1179 1180 Pass this to ``AsterClient(address=...)`` or ``aster shell`` 1181 to connect. Includes relay address (if available) and direct 1182 addresses for LAN connectivity. 1183 """ 1184 self._require_started() 1185 assert self._node is not None 1186 from . import AsterTicket 1187 addr_info = self._node.node_addr_info() 1188 1189 # Resolve relay URL to IP:port for the ticket 1190 relay_addr = None 1191 if addr_info.relay_url: 1192 relay_addr = _resolve_relay_addr(addr_info.relay_url) 1193 1194 t = AsterTicket( 1195 endpoint_id=addr_info.endpoint_id, 1196 relay_addr=relay_addr, 1197 direct_addrs=addr_info.direct_addresses or [], 1198 ) 1199 return t.to_string()
Connection address for this server (aster1... ticket).
Pass this to AsterClient(address=...) or aster shell
to connect. Includes relay address (if available) and direct
addresses for LAN connectivity.
1201 @property 1202 def endpoint_id(self) -> str: 1203 """Hex endpoint ID of this server's node.""" 1204 self._require_started() 1205 assert self._node is not None 1206 return self._node.node_id()
Hex endpoint ID of this server's node.
1244 @property 1245 def services(self) -> list[ServiceSummary]: 1246 """List of services hosted by this server.""" 1247 self._require_started() 1248 return list(self._service_summaries)
List of services hosted by this server.
1250 @property 1251 def root_pubkey(self) -> bytes | None: 1252 """The 32-byte ed25519 trust anchor public key, or ``None``.""" 1253 return self._root_pubkey
The 32-byte ed25519 trust anchor public key, or None.
1257 @property 1258 def node(self) -> IrohNode: 1259 """The underlying ``IrohNode`` (escape hatch for direct iroh access).""" 1260 self._require_started() 1261 assert self._node is not None 1262 return self._node
The underlying IrohNode (escape hatch for direct iroh access).
1264 @property 1265 def blobs(self) -> Any: 1266 """Blobs client backed by this node.""" 1267 self._require_started() 1268 if self._blobs is None: 1269 self._blobs = blobs_client(self._node) 1270 return self._blobs
Blobs client backed by this node.
1272 @property 1273 def docs(self) -> Any: 1274 """Docs client backed by this node.""" 1275 self._require_started() 1276 if self._docs is None: 1277 self._docs = docs_client(self._node) 1278 return self._docs
Docs client backed by this node.
1307class AsterClient: 1308 """High-level, declarative consumer. 1309 1310 Reads configuration from :class:`AsterConfig` (env vars / TOML file) 1311 just like :class:`AsterServer`. In dev mode (no credentials, ephemeral 1312 producer), ``AsterClient()`` with just ``ASTER_ENDPOINT_ADDR`` set is 1313 enough. In production, set ``ASTER_ENROLLMENT_CREDENTIAL`` to the 1314 path of a pre-signed token from the operator. 1315 1316 ``endpoint_addr`` may be a base64 ``NodeAddr`` string (as printed by 1317 :class:`AsterServer`), an ``EndpointId`` hex string (when discovery is 1318 enabled), a :class:`NodeAddr` object, or raw ``NodeAddr.to_bytes()`` 1319 bytes. 1320 """ 1321 1322 def __init__( 1323 self, 1324 *, 1325 config: "AsterConfig | None" = None, 1326 peer: str | None = None, 1327 identity: str | None = None, 1328 # Connection address (aster1... ticket, base64 NodeAddr, or hex EndpointId): 1329 address: str | None = None, 1330 # Back-compat alias for address: 1331 endpoint_addr: NodeAddr | str | bytes | None = None, 1332 root_pubkey: bytes | None = None, 1333 enrollment_credential_file: str | None = None, 1334 # Internal wiring: 1335 channel_name: str = "rpc", 1336 ) -> None: 1337 """Create an Aster RPC client. 1338 1339 Args: 1340 config: Optional :class:`AsterConfig`. If omitted, settings are 1341 loaded from environment variables. 1342 peer: Peer name for identity file lookup. 1343 identity: Path to ``.aster-identity`` file (default: auto-detected 1344 from CWD). Overrides ``config.identity_file``. 1345 address: The server's address. Accepts: 1346 - ``aster1...`` compact ticket (recommended) 1347 - Base64-encoded ``NodeAddr`` 1348 - Hex ``EndpointId`` (requires discovery) 1349 endpoint_addr: Alias for *address* (back-compat). 1350 root_pubkey: 32-byte ed25519 public key of the server's trust 1351 anchor. Required for credential-based admission. 1352 enrollment_credential_file: Path to a pre-signed enrollment 1353 credential (``.cred`` file from ``aster enroll``). 1354 1355 Example:: 1356 1357 # Dev mode -- open gate, no credentials 1358 client = AsterClient(address="aster1...") 1359 await client.connect() 1360 1361 # Production -- with credential 1362 client = AsterClient( 1363 address="aster1...", 1364 root_pubkey=pub_key, 1365 enrollment_credential_file="my-agent.cred", 1366 ) 1367 await client.connect() 1368 """ 1369 from .config import AsterConfig 1370 1371 if config is None: 1372 config = AsterConfig.from_env() 1373 if identity is not None: 1374 config.identity_file = identity 1375 self._config = config 1376 self._peer_name = peer 1377 1378 # Load identity file if present (.aster-identity). 1379 secret_key_from_identity, peer_entry = config.load_identity( 1380 peer_name=peer, role="consumer" 1381 ) 1382 1383 # If the user only passed `enrollment_credential_file` (no separate 1384 # `identity=`), and that file is a TOML `.aster-identity` (which is 1385 # what `aster enroll node` produces), reach into the same file for 1386 # the [node] secret_key. Otherwise the QUIC endpoint id we generate 1387 # at startup won't match the one baked into the credential and the 1388 # server rejects admission with no useful error. 1389 # 1390 # This makes `enrollment_credential_file=` and `identity=` do the 1391 # same thing for the same TOML file -- mirrors how the TS binding's 1392 # AsterClientWrapper now treats `enrollmentCredentialFile`. 1393 cred_file_for_identity = enrollment_credential_file or config.enrollment_credential_file 1394 if ( 1395 not secret_key_from_identity 1396 and not peer_entry 1397 and cred_file_for_identity 1398 and os.path.exists(os.path.expanduser(cred_file_for_identity)) 1399 ): 1400 try: 1401 paired_secret, paired_peer = config.load_identity_from_path( 1402 os.path.expanduser(cred_file_for_identity), 1403 peer_name=peer, 1404 role="consumer", 1405 ) 1406 if paired_secret: 1407 secret_key_from_identity = paired_secret 1408 if paired_peer: 1409 peer_entry = paired_peer 1410 except Exception: 1411 # Best-effort: if the .cred isn't a TOML identity file 1412 # (e.g. it's a flat JSON credential), fall through to the 1413 # existing _load_enrollment_credential path which handles 1414 # both formats. The user just won't get the secret key 1415 # auto-loaded -- they'll need to pass `identity=` too. 1416 pass 1417 1418 if secret_key_from_identity and not config.secret_key: 1419 config.secret_key = secret_key_from_identity 1420 if peer_entry and not root_pubkey: 1421 root_pubkey = bytes.fromhex(peer_entry["root_pubkey"]) 1422 1423 # Resolve endpoint address: address > endpoint_addr > config > error. 1424 addr = address or endpoint_addr or config.endpoint_addr 1425 if addr is None: 1426 raise ValueError( 1427 "AsterClient requires an endpoint address. " 1428 "Set ASTER_ENDPOINT_ADDR or pass endpoint_addr= explicitly." 1429 ) 1430 self._endpoint_addr_in = addr 1431 1432 # Root pubkey (for optional response validation). 1433 pub = config.resolve_root_pubkey() 1434 self._root_pubkey = root_pubkey if root_pubkey is not None else pub 1435 1436 # Enrollment credential: identity file peer > inline > config > None. 1437 if peer_entry and not enrollment_credential_file: 1438 # The peer entry IS the credential -- write it to a temp file 1439 # that _load_enrollment_credential can read, or inline it. 1440 self._inline_credential = peer_entry 1441 self._enrollment_credential_file = None 1442 else: 1443 self._inline_credential = None 1444 self._enrollment_credential_file = ( 1445 enrollment_credential_file or config.enrollment_credential_file 1446 ) 1447 self._enrollment_credential_iid = config.enrollment_credential_iid 1448 self._channel_name = channel_name 1449 1450 self._node: Any | None = None 1451 self._ep: Any | None = None 1452 self._services: list[ServiceSummary] = [] 1453 self._registry_namespace: str = "" 1454 self._gossip_topic: str = "" 1455 self._open_gate: bool = False 1456 self._rpc_conns: dict[str, Any] = {} 1457 self._clients: list[ServiceClient] = [] 1458 self._connected: bool = False 1459 self._closed: bool = False 1460 self._reconnect_attempts: int = 0 1461 self._max_reconnect_attempts: int = 5 1462 self._reconnect_base_delay: float = 1.0 # seconds 1463 1464 async def connect(self) -> None: 1465 """Create endpoint, run admission if credential present, store services. 1466 1467 Idempotent -- second call is a no-op. 1468 """ 1469 if self._connected: 1470 return 1471 1472 # Build a full IrohNode so the consumer can join registry docs 1473 # and fetch blobs. Use persistent storage when configured -- this 1474 # preserves the node identity, joined docs, and downloaded blobs 1475 # across restarts. 1476 ep_cfg = self._config.to_endpoint_config() 1477 ep_config = _clone_config_with_alpns( 1478 ep_cfg, [ALPN_CONSUMER_ADMISSION, RPC_ALPN] 1479 ) 1480 storage = self._config.storage_path 1481 if storage: 1482 self._node = await IrohNode.persistent_with_alpns( 1483 storage, [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config 1484 ) 1485 logger.debug("Consumer node: persistent at %s", storage) 1486 else: 1487 self._node = await IrohNode.memory_with_alpns( 1488 [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config 1489 ) 1490 logger.debug("Consumer node: in-memory (set ASTER_STORAGE_PATH for persistence)") 1491 self._ep = net_client(self._node) 1492 1493 logger.debug( 1494 "Consumer node ready: endpoint_id=%s", 1495 self._node.node_addr_info().endpoint_id[:16] + "…", 1496 ) 1497 1498 # Always run the admission handshake -- even when the consumer gate 1499 # is open, the response carries the services list + registry ticket. 1500 await self._run_admission() 1501 self._connected = True 1502 self._reconnect_attempts = 0 1503 1504 async def reconnect(self) -> None: 1505 """Reconnect after a connection drop. 1506 1507 Closes stale connections, re-runs admission, and rebuilds the 1508 services list. Uses exponential backoff on repeated failures. 1509 """ 1510 self._rpc_conns.clear() 1511 self._clients.clear() 1512 self._connected = False 1513 1514 for attempt in range(self._max_reconnect_attempts): 1515 try: 1516 delay = self._reconnect_base_delay * (2 ** attempt) 1517 if attempt > 0: 1518 logger.info( 1519 "Reconnect attempt %d/%d (delay %.1fs)", 1520 attempt + 1, self._max_reconnect_attempts, delay, 1521 ) 1522 await asyncio.sleep(delay) 1523 1524 await self._run_admission() 1525 self._connected = True 1526 self._reconnect_attempts = 0 1527 logger.info("Reconnected successfully") 1528 return 1529 1530 except Exception as exc: 1531 logger.warning("Reconnect attempt %d failed: %s", attempt + 1, exc) 1532 1533 raise ConnectionError( 1534 f"Failed to reconnect after {self._max_reconnect_attempts} attempts" 1535 ) 1536 1537 async def _run_admission(self) -> None: 1538 """Connect via ``aster.consumer_admission`` to get services. 1539 1540 If an enrollment credential is configured, it's presented for 1541 verification. If not (dev mode / open gate), an empty credential 1542 is sent -- the producer auto-admits when ``allow_all_consumers=True``. 1543 """ 1544 assert self._ep is not None 1545 1546 # Build credential from: inline peer entry > credential file > empty. 1547 credential_file: str | None = None 1548 if self._inline_credential: 1549 cred = _credential_from_peer_entry(self._inline_credential) 1550 cred_json = consumer_cred_to_json(cred) 1551 credential_file = "<inline .aster-identity peer entry>" 1552 elif self._enrollment_credential_file: 1553 cred = _load_enrollment_credential(self._enrollment_credential_file) 1554 cred_json = consumer_cred_to_json(cred) 1555 credential_file = self._enrollment_credential_file 1556 else: 1557 # No credential -- dev mode / open-gate flow. 1558 cred_json = "" 1559 1560 iid_token = self._enrollment_credential_iid or "" 1561 1562 target = _coerce_node_addr(self._endpoint_addr_in) 1563 conn = await self._ep.connect_node_addr(target, ALPN_CONSUMER_ADMISSION) 1564 send, recv = await conn.open_bi() 1565 req = ConsumerAdmissionRequest( 1566 credential_json=cred_json, 1567 iid_token=iid_token, 1568 ) 1569 await send.write_all(req.to_json().encode()) 1570 await send.finish() 1571 raw = await recv.read_to_end(64 * 1024) 1572 resp = ConsumerAdmissionResponse.from_json(raw) 1573 if not resp.admitted: 1574 our_endpoint_id = "" 1575 try: 1576 if self._node is not None: 1577 our_endpoint_id = self._node.node_addr_info().endpoint_id 1578 except Exception: 1579 pass 1580 raise AdmissionDeniedError( 1581 had_credential=bool(cred_json), 1582 credential_file=credential_file, 1583 our_endpoint_id=our_endpoint_id, 1584 server_address=str(self._endpoint_addr_in), 1585 ) 1586 1587 # If the admission succeeded without us presenting a credential, 1588 # the server must be running in open-gate mode (allow_all_consumers). 1589 # The shell uses this to suppress noisy "Identity not configured" 1590 # banners that are irrelevant on open-gate servers. 1591 self._open_gate = not cred_json 1592 1593 self._services = list(resp.services) 1594 self._registry_namespace = resp.registry_namespace or "" 1595 self._gossip_topic = resp.gossip_topic or "" 1596 logger.info( 1597 "Admitted -- services: %s, registry_namespace: %s, gossip_topic: %s", 1598 [s.name for s in self._services], 1599 bool(self._registry_namespace), 1600 bool(self._gossip_topic), 1601 ) 1602 1603 async def _rpc_conn_for(self, rpc_addr_b64: str) -> Any: 1604 if rpc_addr_b64 in self._rpc_conns: 1605 return self._rpc_conns[rpc_addr_b64] 1606 assert self._ep is not None 1607 rpc_addr = _coerce_node_addr(rpc_addr_b64) 1608 conn = await self._ep.connect_node_addr(rpc_addr, RPC_ALPN) 1609 self._rpc_conns[rpc_addr_b64] = conn 1610 return conn 1611 1612 async def client( 1613 self, 1614 service_cls: type, 1615 *, 1616 channel: str | None = None, 1617 codec: Any | None = None, 1618 interceptors: list[Any] | None = None, 1619 ) -> ServiceClient: 1620 """Return an RPC client for ``service_cls``.""" 1621 if not self._connected: 1622 raise RuntimeError("AsterClient not connected; call connect() first") 1623 1624 info = getattr(service_cls, "__aster_service_info__", None) 1625 if info is None: 1626 raise TypeError( 1627 f"{service_cls!r} is not @service-decorated " 1628 f"(missing __aster_service_info__)" 1629 ) 1630 1631 summary: ServiceSummary | None = None 1632 for s in self._services: 1633 if s.name == info.name and s.version == info.version: 1634 summary = s 1635 break 1636 if summary is None: 1637 raise LookupError( 1638 f"service {info.name!r} v{info.version} not offered by producer " 1639 f"(got: {[(s.name, s.version) for s in self._services]})" 1640 ) 1641 1642 channel_key = channel or self._channel_name 1643 if channel_key not in summary.channels: 1644 raise LookupError( 1645 f"service {info.name!r} has no channel {channel_key!r} " 1646 f"(available: {list(summary.channels)})" 1647 ) 1648 1649 conn = await self._rpc_conn_for(summary.channels[channel_key]) 1650 1651 # If the server advertises JSON-only (e.g. the TypeScript binding, 1652 # whose Fory implementation is not yet XLANG-compliant), pick the 1653 # JSON proxy codec automatically -- otherwise the typed Fory client 1654 # would send Fory bytes the server can't decode and the call would 1655 # fail with an opaque "Expected RpcStatus, got NoneType". 1656 if codec is None: 1657 modes = list(getattr(summary, "serialization_modes", None) or []) 1658 if modes and "xlang" not in modes and "json" in modes: 1659 from aster.json_codec import JsonProxyCodec 1660 codec = JsonProxyCodec() 1661 1662 # Session-scoped services use the session protocol (one bidi stream 1663 # with method multiplexing). Dispatch to create_session. 1664 if info.scoped == RpcScope.SESSION: 1665 from aster.session import create_session 1666 client = await create_session( 1667 service_cls, 1668 connection=conn, 1669 codec=codec, 1670 interceptors=interceptors, 1671 ) 1672 else: 1673 client = create_client( 1674 service_cls, 1675 connection=conn, 1676 codec=codec, 1677 interceptors=interceptors, 1678 ) 1679 self._clients.append(client) 1680 return client 1681 1682 async def close(self) -> None: 1683 if self._closed: 1684 return 1685 self._closed = True 1686 1687 for c in self._clients: 1688 try: 1689 await c.close() 1690 except Exception: 1691 pass 1692 self._clients.clear() 1693 self._rpc_conns.clear() 1694 1695 if self._node is not None: 1696 try: 1697 await self._node.shutdown() 1698 except Exception: 1699 pass 1700 elif self._ep is not None: 1701 try: 1702 await self._ep.close() 1703 except Exception: 1704 pass 1705 1706 async def __aenter__(self) -> "AsterClient": 1707 await self.connect() 1708 return self 1709 1710 async def __aexit__(self, exc_type, exc, tb) -> None: 1711 await self.close() 1712 1713 @property 1714 def services(self) -> list[ServiceSummary]: 1715 return list(self._services) 1716 1717 @property 1718 def registry_namespace(self) -> str: 1719 """64-char hex namespace_id for the registry doc. 1720 1721 Empty string if no registry doc was provided by the producer. 1722 """ 1723 return self._registry_namespace 1724 1725 @property 1726 def open_gate(self) -> bool: 1727 """True if the server admitted this client without a credential. 1728 1729 When True, the server is running with ``allow_all_consumers=True`` 1730 and no credential was required. Useful for clients (e.g. the shell) 1731 that want to suppress identity-related banners that are irrelevant 1732 on open-gate servers. 1733 """ 1734 return self._open_gate 1735 1736 def proxy(self, service_name: str) -> "ProxyClient": 1737 """Create a dynamic proxy client for a shared (stream-per-call) service. 1738 1739 The proxy discovers methods from the service contract and builds 1740 method stubs at runtime. No local type definitions needed -- call 1741 methods with dicts and receive dicts back:: 1742 1743 mc = client.proxy("MissionControl") 1744 result = await mc.getStatus({"agent_id": "edge-1"}) 1745 print(result["status"]) 1746 1747 For session-scoped services, use :meth:`session` instead. 1748 1749 Args: 1750 service_name: The service name (e.g., ``"MissionControl"``). 1751 1752 Returns: 1753 A :class:`ProxyClient` with method stubs for each RPC method. 1754 1755 Raises: 1756 TypeError: If the service is session-scoped (use ``session()``). 1757 """ 1758 if not self._connected: 1759 raise RuntimeError("AsterClient not connected; call connect() first") 1760 1761 summary = self._find_service(service_name) 1762 1763 if getattr(summary, "pattern", "shared") == "session": 1764 raise TypeError( 1765 f"'{service_name}' is session-scoped. " 1766 f"Use 'await client.session(\"{service_name}\")' instead of " 1767 f"'client.proxy(\"{service_name}\")'." 1768 ) 1769 1770 return ProxyClient(service_name=service_name, aster_client=self) 1771 1772 async def session(self, service_name: str) -> "SessionProxyClient": 1773 """Create a dynamic proxy client for a session-scoped service. 1774 1775 Opens a single bidirectional QUIC stream and multiplexes calls 1776 over it. Maintains a lock to ensure one call in flight at a time 1777 (spec requirement). Call methods with dicts, receive dicts:: 1778 1779 agent = await client.session("AgentSession") 1780 result = await agent.register({"agent_id": "edge-1"}) 1781 print(result["assignment"]) 1782 await agent.close() 1783 1784 For shared (stream-per-call) services, use :meth:`proxy` instead. 1785 1786 Args: 1787 service_name: The service name (e.g., ``"AgentSession"``). 1788 1789 Returns: 1790 A session proxy with method stubs. Must be closed when done. 1791 """ 1792 if not self._connected: 1793 raise RuntimeError("AsterClient not connected; call connect() first") 1794 1795 summary = self._find_service(service_name) 1796 1797 channel_key = self._channel_name 1798 if channel_key not in summary.channels: 1799 channel_key = next(iter(summary.channels), self._channel_name) 1800 1801 conn = await self._rpc_conn_for(summary.channels.get(channel_key, "")) 1802 1803 codec = None 1804 modes = list(getattr(summary, "serialization_modes", None) or []) 1805 if modes and "xlang" not in modes and "json" in modes: 1806 from aster.json_codec import JsonProxyCodec 1807 codec = JsonProxyCodec() 1808 1809 from aster.session import create_proxy_session 1810 session_client = await create_proxy_session( 1811 service_name=service_name, 1812 connection=conn, 1813 codec=codec, 1814 aster_client=self, 1815 ) 1816 self._clients.append(session_client) 1817 return session_client 1818 1819 def _find_service(self, service_name: str) -> "ServiceSummary": 1820 """Look up a service by name in the admission response.""" 1821 for s in self._services: 1822 if s.name == service_name: 1823 return s 1824 available = [s.name for s in self._services] 1825 raise ValueError( 1826 f"Service '{service_name}' not found. " 1827 f"Available: {available}" 1828 ) 1829 1830 @property 1831 def gossip_topic(self) -> str: 1832 """Hex-encoded 32-byte gossip topic ID for the producer mesh. 1833 1834 Only populated when the connecting consumer is the root node 1835 (endpoint_id == root_pubkey). Empty string otherwise. 1836 """ 1837 return self._gossip_topic
High-level, declarative consumer.
Reads configuration from AsterConfig (env vars / TOML file)
just like AsterServer. In dev mode (no credentials, ephemeral
producer), AsterClient() with just ASTER_ENDPOINT_ADDR set is
enough. In production, set ASTER_ENROLLMENT_CREDENTIAL to the
path of a pre-signed token from the operator.
endpoint_addr may be a base64 NodeAddr string (as printed by
AsterServer), an EndpointId hex string (when discovery is
enabled), a NodeAddr object, or raw NodeAddr.to_bytes()
bytes.
1322 def __init__( 1323 self, 1324 *, 1325 config: "AsterConfig | None" = None, 1326 peer: str | None = None, 1327 identity: str | None = None, 1328 # Connection address (aster1... ticket, base64 NodeAddr, or hex EndpointId): 1329 address: str | None = None, 1330 # Back-compat alias for address: 1331 endpoint_addr: NodeAddr | str | bytes | None = None, 1332 root_pubkey: bytes | None = None, 1333 enrollment_credential_file: str | None = None, 1334 # Internal wiring: 1335 channel_name: str = "rpc", 1336 ) -> None: 1337 """Create an Aster RPC client. 1338 1339 Args: 1340 config: Optional :class:`AsterConfig`. If omitted, settings are 1341 loaded from environment variables. 1342 peer: Peer name for identity file lookup. 1343 identity: Path to ``.aster-identity`` file (default: auto-detected 1344 from CWD). Overrides ``config.identity_file``. 1345 address: The server's address. Accepts: 1346 - ``aster1...`` compact ticket (recommended) 1347 - Base64-encoded ``NodeAddr`` 1348 - Hex ``EndpointId`` (requires discovery) 1349 endpoint_addr: Alias for *address* (back-compat). 1350 root_pubkey: 32-byte ed25519 public key of the server's trust 1351 anchor. Required for credential-based admission. 1352 enrollment_credential_file: Path to a pre-signed enrollment 1353 credential (``.cred`` file from ``aster enroll``). 1354 1355 Example:: 1356 1357 # Dev mode -- open gate, no credentials 1358 client = AsterClient(address="aster1...") 1359 await client.connect() 1360 1361 # Production -- with credential 1362 client = AsterClient( 1363 address="aster1...", 1364 root_pubkey=pub_key, 1365 enrollment_credential_file="my-agent.cred", 1366 ) 1367 await client.connect() 1368 """ 1369 from .config import AsterConfig 1370 1371 if config is None: 1372 config = AsterConfig.from_env() 1373 if identity is not None: 1374 config.identity_file = identity 1375 self._config = config 1376 self._peer_name = peer 1377 1378 # Load identity file if present (.aster-identity). 1379 secret_key_from_identity, peer_entry = config.load_identity( 1380 peer_name=peer, role="consumer" 1381 ) 1382 1383 # If the user only passed `enrollment_credential_file` (no separate 1384 # `identity=`), and that file is a TOML `.aster-identity` (which is 1385 # what `aster enroll node` produces), reach into the same file for 1386 # the [node] secret_key. Otherwise the QUIC endpoint id we generate 1387 # at startup won't match the one baked into the credential and the 1388 # server rejects admission with no useful error. 1389 # 1390 # This makes `enrollment_credential_file=` and `identity=` do the 1391 # same thing for the same TOML file -- mirrors how the TS binding's 1392 # AsterClientWrapper now treats `enrollmentCredentialFile`. 1393 cred_file_for_identity = enrollment_credential_file or config.enrollment_credential_file 1394 if ( 1395 not secret_key_from_identity 1396 and not peer_entry 1397 and cred_file_for_identity 1398 and os.path.exists(os.path.expanduser(cred_file_for_identity)) 1399 ): 1400 try: 1401 paired_secret, paired_peer = config.load_identity_from_path( 1402 os.path.expanduser(cred_file_for_identity), 1403 peer_name=peer, 1404 role="consumer", 1405 ) 1406 if paired_secret: 1407 secret_key_from_identity = paired_secret 1408 if paired_peer: 1409 peer_entry = paired_peer 1410 except Exception: 1411 # Best-effort: if the .cred isn't a TOML identity file 1412 # (e.g. it's a flat JSON credential), fall through to the 1413 # existing _load_enrollment_credential path which handles 1414 # both formats. The user just won't get the secret key 1415 # auto-loaded -- they'll need to pass `identity=` too. 1416 pass 1417 1418 if secret_key_from_identity and not config.secret_key: 1419 config.secret_key = secret_key_from_identity 1420 if peer_entry and not root_pubkey: 1421 root_pubkey = bytes.fromhex(peer_entry["root_pubkey"]) 1422 1423 # Resolve endpoint address: address > endpoint_addr > config > error. 1424 addr = address or endpoint_addr or config.endpoint_addr 1425 if addr is None: 1426 raise ValueError( 1427 "AsterClient requires an endpoint address. " 1428 "Set ASTER_ENDPOINT_ADDR or pass endpoint_addr= explicitly." 1429 ) 1430 self._endpoint_addr_in = addr 1431 1432 # Root pubkey (for optional response validation). 1433 pub = config.resolve_root_pubkey() 1434 self._root_pubkey = root_pubkey if root_pubkey is not None else pub 1435 1436 # Enrollment credential: identity file peer > inline > config > None. 1437 if peer_entry and not enrollment_credential_file: 1438 # The peer entry IS the credential -- write it to a temp file 1439 # that _load_enrollment_credential can read, or inline it. 1440 self._inline_credential = peer_entry 1441 self._enrollment_credential_file = None 1442 else: 1443 self._inline_credential = None 1444 self._enrollment_credential_file = ( 1445 enrollment_credential_file or config.enrollment_credential_file 1446 ) 1447 self._enrollment_credential_iid = config.enrollment_credential_iid 1448 self._channel_name = channel_name 1449 1450 self._node: Any | None = None 1451 self._ep: Any | None = None 1452 self._services: list[ServiceSummary] = [] 1453 self._registry_namespace: str = "" 1454 self._gossip_topic: str = "" 1455 self._open_gate: bool = False 1456 self._rpc_conns: dict[str, Any] = {} 1457 self._clients: list[ServiceClient] = [] 1458 self._connected: bool = False 1459 self._closed: bool = False 1460 self._reconnect_attempts: int = 0 1461 self._max_reconnect_attempts: int = 5 1462 self._reconnect_base_delay: float = 1.0 # seconds
Create an Aster RPC client.
Args:
config: Optional AsterConfig. If omitted, settings are
loaded from environment variables.
peer: Peer name for identity file lookup.
identity: Path to .aster-identity file (default: auto-detected
from CWD). Overrides config.identity_file.
address: The server's address. Accepts:
- aster1... compact ticket (recommended)
- Base64-encoded NodeAddr
- Hex EndpointId (requires discovery)
endpoint_addr: Alias for address (back-compat).
root_pubkey: 32-byte ed25519 public key of the server's trust
anchor. Required for credential-based admission.
enrollment_credential_file: Path to a pre-signed enrollment
credential (.cred file from aster enroll).
Example::
# Dev mode -- open gate, no credentials
client = AsterClient(address="aster1...")
await client.connect()
# Production -- with credential
client = AsterClient(
address="aster1...",
root_pubkey=pub_key,
enrollment_credential_file="my-agent.cred",
)
await client.connect()
1464 async def connect(self) -> None: 1465 """Create endpoint, run admission if credential present, store services. 1466 1467 Idempotent -- second call is a no-op. 1468 """ 1469 if self._connected: 1470 return 1471 1472 # Build a full IrohNode so the consumer can join registry docs 1473 # and fetch blobs. Use persistent storage when configured -- this 1474 # preserves the node identity, joined docs, and downloaded blobs 1475 # across restarts. 1476 ep_cfg = self._config.to_endpoint_config() 1477 ep_config = _clone_config_with_alpns( 1478 ep_cfg, [ALPN_CONSUMER_ADMISSION, RPC_ALPN] 1479 ) 1480 storage = self._config.storage_path 1481 if storage: 1482 self._node = await IrohNode.persistent_with_alpns( 1483 storage, [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config 1484 ) 1485 logger.debug("Consumer node: persistent at %s", storage) 1486 else: 1487 self._node = await IrohNode.memory_with_alpns( 1488 [ALPN_CONSUMER_ADMISSION, RPC_ALPN], ep_config 1489 ) 1490 logger.debug("Consumer node: in-memory (set ASTER_STORAGE_PATH for persistence)") 1491 self._ep = net_client(self._node) 1492 1493 logger.debug( 1494 "Consumer node ready: endpoint_id=%s", 1495 self._node.node_addr_info().endpoint_id[:16] + "…", 1496 ) 1497 1498 # Always run the admission handshake -- even when the consumer gate 1499 # is open, the response carries the services list + registry ticket. 1500 await self._run_admission() 1501 self._connected = True 1502 self._reconnect_attempts = 0
Create endpoint, run admission if credential present, store services.
Idempotent -- second call is a no-op.
1504 async def reconnect(self) -> None: 1505 """Reconnect after a connection drop. 1506 1507 Closes stale connections, re-runs admission, and rebuilds the 1508 services list. Uses exponential backoff on repeated failures. 1509 """ 1510 self._rpc_conns.clear() 1511 self._clients.clear() 1512 self._connected = False 1513 1514 for attempt in range(self._max_reconnect_attempts): 1515 try: 1516 delay = self._reconnect_base_delay * (2 ** attempt) 1517 if attempt > 0: 1518 logger.info( 1519 "Reconnect attempt %d/%d (delay %.1fs)", 1520 attempt + 1, self._max_reconnect_attempts, delay, 1521 ) 1522 await asyncio.sleep(delay) 1523 1524 await self._run_admission() 1525 self._connected = True 1526 self._reconnect_attempts = 0 1527 logger.info("Reconnected successfully") 1528 return 1529 1530 except Exception as exc: 1531 logger.warning("Reconnect attempt %d failed: %s", attempt + 1, exc) 1532 1533 raise ConnectionError( 1534 f"Failed to reconnect after {self._max_reconnect_attempts} attempts" 1535 )
Reconnect after a connection drop.
Closes stale connections, re-runs admission, and rebuilds the services list. Uses exponential backoff on repeated failures.
1612 async def client( 1613 self, 1614 service_cls: type, 1615 *, 1616 channel: str | None = None, 1617 codec: Any | None = None, 1618 interceptors: list[Any] | None = None, 1619 ) -> ServiceClient: 1620 """Return an RPC client for ``service_cls``.""" 1621 if not self._connected: 1622 raise RuntimeError("AsterClient not connected; call connect() first") 1623 1624 info = getattr(service_cls, "__aster_service_info__", None) 1625 if info is None: 1626 raise TypeError( 1627 f"{service_cls!r} is not @service-decorated " 1628 f"(missing __aster_service_info__)" 1629 ) 1630 1631 summary: ServiceSummary | None = None 1632 for s in self._services: 1633 if s.name == info.name and s.version == info.version: 1634 summary = s 1635 break 1636 if summary is None: 1637 raise LookupError( 1638 f"service {info.name!r} v{info.version} not offered by producer " 1639 f"(got: {[(s.name, s.version) for s in self._services]})" 1640 ) 1641 1642 channel_key = channel or self._channel_name 1643 if channel_key not in summary.channels: 1644 raise LookupError( 1645 f"service {info.name!r} has no channel {channel_key!r} " 1646 f"(available: {list(summary.channels)})" 1647 ) 1648 1649 conn = await self._rpc_conn_for(summary.channels[channel_key]) 1650 1651 # If the server advertises JSON-only (e.g. the TypeScript binding, 1652 # whose Fory implementation is not yet XLANG-compliant), pick the 1653 # JSON proxy codec automatically -- otherwise the typed Fory client 1654 # would send Fory bytes the server can't decode and the call would 1655 # fail with an opaque "Expected RpcStatus, got NoneType". 1656 if codec is None: 1657 modes = list(getattr(summary, "serialization_modes", None) or []) 1658 if modes and "xlang" not in modes and "json" in modes: 1659 from aster.json_codec import JsonProxyCodec 1660 codec = JsonProxyCodec() 1661 1662 # Session-scoped services use the session protocol (one bidi stream 1663 # with method multiplexing). Dispatch to create_session. 1664 if info.scoped == RpcScope.SESSION: 1665 from aster.session import create_session 1666 client = await create_session( 1667 service_cls, 1668 connection=conn, 1669 codec=codec, 1670 interceptors=interceptors, 1671 ) 1672 else: 1673 client = create_client( 1674 service_cls, 1675 connection=conn, 1676 codec=codec, 1677 interceptors=interceptors, 1678 ) 1679 self._clients.append(client) 1680 return client
Return an RPC client for service_cls.
1717 @property 1718 def registry_namespace(self) -> str: 1719 """64-char hex namespace_id for the registry doc. 1720 1721 Empty string if no registry doc was provided by the producer. 1722 """ 1723 return self._registry_namespace
64-char hex namespace_id for the registry doc.
Empty string if no registry doc was provided by the producer.
1725 @property 1726 def open_gate(self) -> bool: 1727 """True if the server admitted this client without a credential. 1728 1729 When True, the server is running with ``allow_all_consumers=True`` 1730 and no credential was required. Useful for clients (e.g. the shell) 1731 that want to suppress identity-related banners that are irrelevant 1732 on open-gate servers. 1733 """ 1734 return self._open_gate
True if the server admitted this client without a credential.
When True, the server is running with allow_all_consumers=True
and no credential was required. Useful for clients (e.g. the shell)
that want to suppress identity-related banners that are irrelevant
on open-gate servers.
1736 def proxy(self, service_name: str) -> "ProxyClient": 1737 """Create a dynamic proxy client for a shared (stream-per-call) service. 1738 1739 The proxy discovers methods from the service contract and builds 1740 method stubs at runtime. No local type definitions needed -- call 1741 methods with dicts and receive dicts back:: 1742 1743 mc = client.proxy("MissionControl") 1744 result = await mc.getStatus({"agent_id": "edge-1"}) 1745 print(result["status"]) 1746 1747 For session-scoped services, use :meth:`session` instead. 1748 1749 Args: 1750 service_name: The service name (e.g., ``"MissionControl"``). 1751 1752 Returns: 1753 A :class:`ProxyClient` with method stubs for each RPC method. 1754 1755 Raises: 1756 TypeError: If the service is session-scoped (use ``session()``). 1757 """ 1758 if not self._connected: 1759 raise RuntimeError("AsterClient not connected; call connect() first") 1760 1761 summary = self._find_service(service_name) 1762 1763 if getattr(summary, "pattern", "shared") == "session": 1764 raise TypeError( 1765 f"'{service_name}' is session-scoped. " 1766 f"Use 'await client.session(\"{service_name}\")' instead of " 1767 f"'client.proxy(\"{service_name}\")'." 1768 ) 1769 1770 return ProxyClient(service_name=service_name, aster_client=self)
Create a dynamic proxy client for a shared (stream-per-call) service.
The proxy discovers methods from the service contract and builds method stubs at runtime. No local type definitions needed -- call methods with dicts and receive dicts back::
mc = client.proxy("MissionControl")
result = await mc.getStatus({"agent_id": "edge-1"})
print(result["status"])
For session-scoped services, use session() instead.
Args:
service_name: The service name (e.g., "MissionControl").
Returns:
A ProxyClient with method stubs for each RPC method.
Raises:
TypeError: If the service is session-scoped (use session()).
1772 async def session(self, service_name: str) -> "SessionProxyClient": 1773 """Create a dynamic proxy client for a session-scoped service. 1774 1775 Opens a single bidirectional QUIC stream and multiplexes calls 1776 over it. Maintains a lock to ensure one call in flight at a time 1777 (spec requirement). Call methods with dicts, receive dicts:: 1778 1779 agent = await client.session("AgentSession") 1780 result = await agent.register({"agent_id": "edge-1"}) 1781 print(result["assignment"]) 1782 await agent.close() 1783 1784 For shared (stream-per-call) services, use :meth:`proxy` instead. 1785 1786 Args: 1787 service_name: The service name (e.g., ``"AgentSession"``). 1788 1789 Returns: 1790 A session proxy with method stubs. Must be closed when done. 1791 """ 1792 if not self._connected: 1793 raise RuntimeError("AsterClient not connected; call connect() first") 1794 1795 summary = self._find_service(service_name) 1796 1797 channel_key = self._channel_name 1798 if channel_key not in summary.channels: 1799 channel_key = next(iter(summary.channels), self._channel_name) 1800 1801 conn = await self._rpc_conn_for(summary.channels.get(channel_key, "")) 1802 1803 codec = None 1804 modes = list(getattr(summary, "serialization_modes", None) or []) 1805 if modes and "xlang" not in modes and "json" in modes: 1806 from aster.json_codec import JsonProxyCodec 1807 codec = JsonProxyCodec() 1808 1809 from aster.session import create_proxy_session 1810 session_client = await create_proxy_session( 1811 service_name=service_name, 1812 connection=conn, 1813 codec=codec, 1814 aster_client=self, 1815 ) 1816 self._clients.append(session_client) 1817 return session_client
Create a dynamic proxy client for a session-scoped service.
Opens a single bidirectional QUIC stream and multiplexes calls over it. Maintains a lock to ensure one call in flight at a time (spec requirement). Call methods with dicts, receive dicts::
agent = await client.session("AgentSession")
result = await agent.register({"agent_id": "edge-1"})
print(result["assignment"])
await agent.close()
For shared (stream-per-call) services, use proxy() instead.
Args:
service_name: The service name (e.g., "AgentSession").
Returns: A session proxy with method stubs. Must be closed when done.
1830 @property 1831 def gossip_topic(self) -> str: 1832 """Hex-encoded 32-byte gossip topic ID for the producer mesh. 1833 1834 Only populated when the connecting consumer is the root node 1835 (endpoint_id == root_pubkey). Empty string otherwise. 1836 """ 1837 return self._gossip_topic
Hex-encoded 32-byte gossip topic ID for the producer mesh.
Only populated when the connecting consumer is the root node (endpoint_id == root_pubkey). Empty string otherwise.
276@dataclass 277class AsterConfig: 278 """Unified configuration for :class:`AsterServer`. 279 280 Combines trust (root public key, admission policy), storage (memory vs 281 persistent), and networking (relay, bind address, etc.) into one object. 282 283 **Trust model (Aster-trust-spec.md §1.1):** The root *private* key is 284 offline -- it never touches a running node. Nodes receive only the root 285 *public* key (to verify credentials) and optionally an enrollment 286 credential (a pre-signed token for mesh join). The founding node of a 287 mesh needs no enrollment credential; it bootstraps the accepted-producer 288 set with just its own EndpointId. 289 290 Three ways to get an ``AsterConfig``: 291 292 1. **Auto from env** (default when ``AsterServer`` gets no config):: 293 294 config = AsterConfig.from_env() 295 296 2. **From a TOML file** (with env overrides):: 297 298 config = AsterConfig.from_file("aster.toml") 299 300 3. **Inline** (testing, scripts):: 301 302 config = AsterConfig(root_pubkey=pub) 303 """ 304 305 # ── Trust ──────────────────────────────────────────────────────────── 306 307 root_pubkey: bytes | None = None 308 """32-byte ed25519 root public key (the deployment trust anchor). 309 Highest priority -- overrides ``root_pubkey_file`` when set.""" 310 311 root_pubkey_file: str | None = None 312 """Path to a file containing the root public key. Accepts either a 313 plain hex string or a JSON object with a ``"public_key"`` field.""" 314 315 enrollment_credential_file: str | None = None 316 """Path to a JSON enrollment credential (pre-signed by the offline root 317 key). Required when a node joins an existing producer mesh. Not needed 318 for the founding node or for dev/ephemeral mode.""" 319 320 allow_all_consumers: bool = False 321 """Skip consumer admission gate. Default: gate consumers.""" 322 323 enrollment_credential_iid: str | None = None 324 """Cloud Instance Identity Document token. Required when the enrollment 325 credential's policy includes ``aster.iid_*`` attributes (AWS/GCP/Azure). 326 Set via ``ASTER_ENROLLMENT_CREDENTIAL_IID``.""" 327 328 allow_all_consumers: bool = False 329 """Skip consumer admission gate. Default: gate consumers.""" 330 331 allow_all_producers: bool = True 332 """Skip producer admission gate. Default: allow all producers.""" 333 334 # ── Connect (consumer-side) ────────────────────────────────────────── 335 336 endpoint_addr: str | None = None 337 """Producer's endpoint address (base64 NodeAddr or EndpointId hex string). 338 The consumer dials this to reach RPC + admission + blobs/docs/gossip. 339 Set via ``ASTER_ENDPOINT_ADDR``.""" 340 341 # ── Storage ────────────────────────────────────────────────────────── 342 343 storage_path: str | None = None 344 """If set, use FsStore at this path; otherwise in-memory.""" 345 346 # ── Network (forwarded to EndpointConfig) ──────────────────────────── 347 348 secret_key: bytes | None = None 349 """32-byte node identity key. Determines the stable ``EndpointId``. 350 If unset, a fresh identity is generated each run (fine for dev).""" 351 352 relay_mode: str | None = None 353 bind_addr: str | None = None 354 enable_monitoring: bool = False 355 enable_hooks: bool = False 356 hook_timeout_ms: int = 5000 357 clear_ip_transports: bool = False 358 clear_relay_transports: bool = False 359 portmapper_config: str | None = None 360 proxy_url: str | None = None 361 proxy_from_env: bool = False 362 local_discovery: bool = False 363 """Enable mDNS local network discovery. Nodes on the same LAN can find 364 each other without relay servers. Default off. 365 Env: ``ASTER_LOCAL_DISCOVERY``. TOML: ``[network] local_discovery``.""" 366 367 # ── Logging / observability ───────────────────────────────────────── 368 369 log_format: str = "text" 370 """Log output format: ``"json"`` for structured (k8s/ELK) or ``"text"`` for dev. 371 Env: ``ASTER_LOG_FORMAT``. TOML: ``[logging] format``.""" 372 373 log_level: str = "info" 374 """Log level: ``"debug"``, ``"info"``, ``"warning"``, ``"error"``. 375 Env: ``ASTER_LOG_LEVEL``. TOML: ``[logging] level``.""" 376 377 log_mask: bool = True 378 """Mask sensitive fields in logs (keys, credentials, endpoint IDs). 379 Env: ``ASTER_LOG_MASK``. TOML: ``[logging] mask``.""" 380 381 # ── Identity file ──────────────────────────────────────────────────── 382 383 identity_file: str | None = None 384 """Path to ``.aster-identity`` TOML file. When set, the node key and 385 enrollment credentials are loaded from this file. 386 Env: ``ASTER_IDENTITY_FILE``. Default: looks for ``.aster-identity`` 387 in the current working directory.""" 388 389 # ── Internal (not user-facing) ─────────────────────────────────────── 390 391 _sources: dict = dc_field(default_factory=dict, repr=False) 392 """Provenance tracker: maps field name → source string 393 (e.g. ``"ASTER_ROOT_PUBKEY_FILE"``, ``"aster.toml [trust]"``, ``"default"``).""" 394 395 _ephemeral_privkey: bytes | None = dc_field(default=None, repr=False) 396 """Transient private key generated in dev mode. Used only by 397 ``simple_consumer`` to auto-mint a credential. Never persisted, 398 never configurable. ``None`` in production.""" 399 400 # ── Resolve ────────────────────────────────────────────────────────── 401 402 def resolve_root_pubkey(self) -> bytes | None: 403 """Return the root public key, resolving from config sources. 404 405 Resolution order: 406 407 1. Inline ``root_pubkey`` (highest priority). 408 2. ``root_pubkey_file`` (hex string or JSON with ``public_key``). 409 3. Generate an ephemeral keypair (dev mode only -- logged). 410 411 The root *private* key never appears here. In dev mode, a transient 412 private key is stored on ``_ephemeral_privkey`` so the companion 413 ``simple_consumer`` example can auto-mint a credential; it is never 414 persisted or configurable. 415 """ 416 if self.root_pubkey is not None: 417 return self.root_pubkey 418 419 if self.root_pubkey_file: 420 pub = _load_pubkey_from_file(self.root_pubkey_file) 421 if pub is not None: 422 self.root_pubkey = pub 423 return pub 424 _config_logger.warning( 425 "Root pubkey file %s not found or invalid", self.root_pubkey_file 426 ) 427 428 # Dev mode: generate ephemeral keypair if admission is needed. 429 if not self.allow_all_consumers or not self.allow_all_producers: 430 from .trust.signing import generate_root_keypair 431 432 priv, pub = generate_root_keypair() 433 self._ephemeral_privkey = priv # transient, never persisted 434 self.root_pubkey = pub 435 self._sources["root_pubkey"] = "ephemeral (dev mode)" 436 _config_logger.info( 437 "Generated ephemeral root key " 438 "(set ASTER_ROOT_PUBKEY_FILE for production)" 439 ) 440 return pub 441 442 return None 443 444 def load_identity(self, peer_name: str | None = None, 445 role: str | None = None) -> tuple[bytes | None, dict | None]: 446 """Load a peer entry from the ``.aster-identity`` file. 447 448 Args: 449 peer_name: Select peer by ``name``. If None, auto-selects by 450 ``role`` (first match). 451 role: Fallback selector when ``peer_name`` is None. 452 ``"producer"`` for AsterServer, ``"consumer"`` for AsterClient. 453 454 Returns: 455 ``(secret_key_bytes, peer_dict)`` or ``(None, None)`` when no 456 identity file is found. 457 """ 458 import base64 as _b64 459 460 path = self._resolve_identity_path() 461 if path is None: 462 return None, None 463 464 if sys.version_info >= (3, 11): 465 import tomllib as _tl 466 else: 467 import tomli as _tl # type: ignore[no-redef] 468 469 with open(path, "rb") as f: 470 data = _tl.load(f) 471 472 node = data.get("node", {}) 473 secret_key_b64 = node.get("secret_key") 474 secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None 475 476 # Find the peer entry 477 peers = data.get("peers", []) 478 peer = None 479 if peer_name is not None: 480 for p in peers: 481 if p.get("name") == peer_name: 482 peer = p 483 break 484 elif role is not None: 485 for p in peers: 486 if p.get("role") == role: 487 peer = p 488 break 489 elif peers: 490 peer = peers[0] # single peer → auto-select 491 492 return secret_key, peer 493 494 def _resolve_identity_path(self) -> str | None: 495 """Find the .aster-identity file.""" 496 if self.identity_file: 497 expanded = os.path.expanduser(self.identity_file) 498 return expanded if os.path.exists(expanded) else None 499 # Default: look in cwd 500 default = os.path.join(os.getcwd(), ".aster-identity") 501 return default if os.path.exists(default) else None 502 503 def load_identity_from_path( 504 self, 505 path: str, 506 peer_name: str | None = None, 507 role: str | None = None, 508 ) -> tuple[bytes | None, dict | None]: 509 """Load secret_key + peer entry from a specific TOML identity file. 510 511 Same return shape as :meth:`load_identity` but reads from an 512 explicit path instead of consulting ``self.identity_file``. Used 513 by ``AsterClient`` to fold the secret key out of an 514 ``enrollment_credential_file`` when no separate ``identity=`` was 515 provided -- both options should do the same thing for the same 516 TOML file produced by ``aster enroll node``. 517 """ 518 import base64 as _b64 519 520 if sys.version_info >= (3, 11): 521 import tomllib as _tl 522 else: 523 import tomli as _tl # type: ignore[no-redef] 524 525 with open(path, "rb") as f: 526 data = _tl.load(f) 527 528 node = data.get("node", {}) 529 secret_key_b64 = node.get("secret_key") 530 secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None 531 532 peers = data.get("peers", []) 533 peer = None 534 if peer_name is not None: 535 for p in peers: 536 if p.get("name") == peer_name: 537 peer = p 538 break 539 elif role is not None: 540 for p in peers: 541 if p.get("role") == role: 542 peer = p 543 break 544 elif peers: 545 peer = peers[0] 546 547 return secret_key, peer 548 549 def to_endpoint_config(self) -> EndpointConfig | None: 550 """Build an :class:`EndpointConfig` from the network fields. 551 552 Returns ``None`` when all network fields are at their defaults 553 (so ``IrohNode.memory_with_alpns`` can use the fast 554 ``Endpoint::bind(presets::N0)`` path). 555 """ 556 has_custom = any([ 557 self.relay_mode, self.secret_key, self.bind_addr, 558 self.enable_monitoring, self.enable_hooks, 559 self.clear_ip_transports, self.clear_relay_transports, 560 self.portmapper_config, self.proxy_url, self.proxy_from_env, 561 self.local_discovery, 562 self.hook_timeout_ms != 5000, 563 ]) 564 if not has_custom: 565 return None 566 return EndpointConfig( 567 alpns=[], # Router sets ALPNs 568 relay_mode=self.relay_mode, 569 secret_key=self.secret_key, 570 enable_monitoring=self.enable_monitoring, 571 enable_hooks=self.enable_hooks, 572 hook_timeout_ms=self.hook_timeout_ms, 573 bind_addr=self.bind_addr, 574 clear_ip_transports=self.clear_ip_transports, 575 clear_relay_transports=self.clear_relay_transports, 576 portmapper_config=self.portmapper_config, 577 proxy_url=self.proxy_url, 578 proxy_from_env=self.proxy_from_env, 579 enable_local_discovery=self.local_discovery, 580 ) 581 582 # ── print_config ───────────────────────────────────────────────────── 583 584 def print_config(self, *, json: bool = False) -> str: 585 """Render the resolved configuration with provenance and masking. 586 587 Sensitive fields (``secret_key``, ``enrollment_credential_file``) 588 are masked. ``root_pubkey`` is public and shown in full. 589 590 Args: 591 json: If True, return a JSON string; otherwise a human-readable table. 592 593 Returns: 594 The rendered config string (also printed to stdout). 595 """ 596 sections = { 597 "trust": [ 598 ("root_pubkey", self._fmt_bytes(self.root_pubkey, mask=False)), 599 ("root_pubkey_file", self.root_pubkey_file or "<not set>"), 600 ("enrollment_credential_file", self._fmt_masked(self.enrollment_credential_file)), 601 ("enrollment_credential_iid", self._fmt_masked(self.enrollment_credential_iid)), 602 ("allow_all_consumers", self.allow_all_consumers), 603 ("allow_all_producers", self.allow_all_producers), 604 ], 605 "connect": [ 606 ("endpoint_addr", self.endpoint_addr or "<not set>"), 607 ], 608 "network": [ 609 ("secret_key", self._fmt_bytes(self.secret_key, mask=True)), 610 ("relay_mode", self.relay_mode or "<default>"), 611 ("bind_addr", self.bind_addr or "<any>"), 612 ("enable_monitoring", self.enable_monitoring), 613 ("enable_hooks", self.enable_hooks), 614 ], 615 "storage": [ 616 ("path", self.storage_path or "<in-memory>"), 617 ], 618 } 619 620 if json: 621 out: dict = {} 622 for section, fields in sections.items(): 623 out[section] = {} 624 for name, value in fields: 625 source = self._sources.get(name, "default") 626 out[section][name] = {"value": value, "source": source} 627 text = _json.dumps(out, indent=2, default=str) 628 else: 629 lines = [] 630 for section, fields in sections.items(): 631 lines.append(f" [{section}]") 632 for name, value in fields: 633 source = self._sources.get(name, "default") 634 lines.append(f" {name:<28s}: {value!s:<36s} ({source})") 635 text = "\n".join(lines) 636 637 print(text) 638 return text 639 640 @staticmethod 641 def _fmt_bytes(val: bytes | None, *, mask: bool) -> str: 642 if val is None: 643 return "<not set>" 644 h = val.hex() 645 if mask: 646 return f"****...{h[-8:]}" if len(h) > 8 else "****" 647 return h 648 649 @staticmethod 650 def _fmt_masked(val: str | None) -> str: 651 if val is None: 652 return "<not set>" 653 return f"****...{val[-12:]}" if len(val) > 12 else "****" 654 655 # ── Factory methods ────────────────────────────────────────────────── 656 657 @classmethod 658 def from_env(cls) -> "AsterConfig": 659 """Build config from ``ASTER_*`` environment variables only.""" 660 return cls._load(toml_data=None, toml_path=None) 661 662 @classmethod 663 def from_file(cls, path: str | Path) -> "AsterConfig": 664 """Build config from a TOML file, with env-var overrides.""" 665 with Path(path).open("rb") as fh: 666 raw = tomllib.load(fh) 667 return cls._load(toml_data=raw, toml_path=str(path)) 668 669 @classmethod 670 def _load(cls, toml_data: dict | None, toml_path: str | None) -> "AsterConfig": 671 kwargs: dict = {} 672 sources: dict[str, str] = {} 673 env = os.environ 674 toml_label = toml_path or "aster.toml" 675 676 def _set(field: str, value, source: str) -> None: 677 kwargs[field] = value 678 sources[field] = source 679 680 # ── Trust (TOML [trust] section) ───────────────────────────────── 681 trust = (toml_data or {}).get("trust", {}) 682 if "root_pubkey" in trust: 683 raw = trust["root_pubkey"] 684 _set("root_pubkey", bytes.fromhex(raw), f"{toml_label} [trust]") 685 if "root_pubkey_file" in trust: 686 _set("root_pubkey_file", str(trust["root_pubkey_file"]), f"{toml_label} [trust]") 687 if "enrollment_credential" in trust: 688 _set("enrollment_credential_file", str(trust["enrollment_credential"]), f"{toml_label} [trust]") 689 if "enrollment_credential_iid" in trust: 690 _set("enrollment_credential_iid", str(trust["enrollment_credential_iid"]), f"{toml_label} [trust]") 691 if "allow_all_consumers" in trust: 692 _set("allow_all_consumers", bool(trust["allow_all_consumers"]), f"{toml_label} [trust]") 693 if "allow_all_producers" in trust: 694 _set("allow_all_producers", bool(trust["allow_all_producers"]), f"{toml_label} [trust]") 695 696 # ── Connect (TOML [connect] section) ───────────────────────────── 697 connect = (toml_data or {}).get("connect", {}) 698 if "endpoint_addr" in connect: 699 _set("endpoint_addr", str(connect["endpoint_addr"]), f"{toml_label} [connect]") 700 701 # ── Storage (TOML [storage] section) ───────────────────────────── 702 storage = (toml_data or {}).get("storage", {}) 703 if "path" in storage: 704 _set("storage_path", str(storage["path"]), f"{toml_label} [storage]") 705 706 # ── Network (TOML [network] section) ───────────────────────────── 707 network = (toml_data or {}).get("network", {}) 708 _NET_FIELDS = ("relay_mode", "bind_addr", "portmapper_config", "proxy_url") 709 for f in _NET_FIELDS: 710 if f in network: 711 _set(f, str(network[f]) if network[f] is not None else None, f"{toml_label} [network]") 712 _NET_BOOLS = ( 713 "enable_monitoring", "enable_hooks", 714 "clear_ip_transports", "clear_relay_transports", "proxy_from_env", 715 "local_discovery", 716 ) 717 for f in _NET_BOOLS: 718 if f in network: 719 _set(f, bool(network[f]), f"{toml_label} [network]") 720 if "hook_timeout_ms" in network: 721 _set("hook_timeout_ms", int(network["hook_timeout_ms"]), f"{toml_label} [network]") 722 if "secret_key" in network and network["secret_key"] is not None: 723 _set("secret_key", base64.b64decode(network["secret_key"]), f"{toml_label} [network]") 724 725 # ── Logging (TOML [logging] section) ───────────────────────────── 726 log_sec = (toml_data or {}).get("logging", {}) 727 if "format" in log_sec: 728 _set("log_format", str(log_sec["format"]).lower(), f"{toml_label} [logging]") 729 if "level" in log_sec: 730 _set("log_level", str(log_sec["level"]).lower(), f"{toml_label} [logging]") 731 if "mask" in log_sec: 732 _set("log_mask", bool(log_sec["mask"]), f"{toml_label} [logging]") 733 734 # ── Env overrides (always win) ─────────────────────────────────── 735 if (v := env.get("ASTER_ROOT_PUBKEY")) is not None: 736 _set("root_pubkey", bytes.fromhex(v.strip()), "ASTER_ROOT_PUBKEY") 737 if (v := env.get("ASTER_ROOT_PUBKEY_FILE")) is not None: 738 _set("root_pubkey_file", v.strip(), "ASTER_ROOT_PUBKEY_FILE") 739 if (v := env.get("ASTER_ENROLLMENT_CREDENTIAL")) is not None: 740 _set("enrollment_credential_file", v.strip(), "ASTER_ENROLLMENT_CREDENTIAL") 741 if (v := env.get("ASTER_ENROLLMENT_CREDENTIAL_IID")) is not None: 742 _set("enrollment_credential_iid", v.strip(), "ASTER_ENROLLMENT_CREDENTIAL_IID") 743 if (v := env.get("ASTER_ENDPOINT_ADDR")) is not None: 744 _set("endpoint_addr", v.strip(), "ASTER_ENDPOINT_ADDR") 745 if (v := env.get("ASTER_IDENTITY_FILE")) is not None: 746 _set("identity_file", v.strip(), "ASTER_IDENTITY_FILE") 747 if (v := env.get("ASTER_ALLOW_ALL_CONSUMERS")) is not None: 748 _set("allow_all_consumers", _parse_bool(v, "ASTER_ALLOW_ALL_CONSUMERS"), "ASTER_ALLOW_ALL_CONSUMERS") 749 if (v := env.get("ASTER_ALLOW_ALL_PRODUCERS")) is not None: 750 _set("allow_all_producers", _parse_bool(v, "ASTER_ALLOW_ALL_PRODUCERS"), "ASTER_ALLOW_ALL_PRODUCERS") 751 if (v := env.get("ASTER_STORAGE_PATH")) is not None: 752 _set("storage_path", v.strip() or None, "ASTER_STORAGE_PATH") 753 if (v := env.get("ASTER_SECRET_KEY")) is not None: 754 _set("secret_key", base64.b64decode(v.strip()) if v.strip() else None, "ASTER_SECRET_KEY") 755 if (v := env.get("ASTER_RELAY_MODE")) is not None: 756 _set("relay_mode", v.strip() or None, "ASTER_RELAY_MODE") 757 if (v := env.get("ASTER_BIND_ADDR")) is not None: 758 _set("bind_addr", v.strip() or None, "ASTER_BIND_ADDR") 759 for f in _NET_BOOLS: 760 var = f"ASTER_{f.upper()}" 761 if (v := env.get(var)) is not None: 762 _set(f, _parse_bool(v, var), var) 763 if (v := env.get("ASTER_HOOK_TIMEOUT_MS")) is not None: 764 _set("hook_timeout_ms", int(v), "ASTER_HOOK_TIMEOUT_MS") 765 for f in ("portmapper_config", "proxy_url"): 766 var = f"ASTER_{f.upper()}" 767 if (v := env.get(var)) is not None: 768 _set(f, v.strip() or None, var) 769 770 # Logging env overrides 771 if (v := env.get("ASTER_LOG_FORMAT")) is not None: 772 _set("log_format", v.strip().lower(), "ASTER_LOG_FORMAT") 773 if (v := env.get("ASTER_LOG_LEVEL")) is not None: 774 _set("log_level", v.strip().lower(), "ASTER_LOG_LEVEL") 775 if (v := env.get("ASTER_LOG_MASK")) is not None: 776 _set("log_mask", _parse_bool(v, "ASTER_LOG_MASK"), "ASTER_LOG_MASK") 777 778 obj = cls(**{k: v for k, v in kwargs.items() if k != "_sources"}) 779 obj._sources = sources 780 return obj
Unified configuration for AsterServer.
Combines trust (root public key, admission policy), storage (memory vs persistent), and networking (relay, bind address, etc.) into one object.
Trust model (Aster-trust-spec.md §1.1): The root private key is offline -- it never touches a running node. Nodes receive only the root public key (to verify credentials) and optionally an enrollment credential (a pre-signed token for mesh join). The founding node of a mesh needs no enrollment credential; it bootstraps the accepted-producer set with just its own EndpointId.
Three ways to get an AsterConfig:
Auto from env (default when
AsterServergets no config)::config = AsterConfig.from_env()
From a TOML file (with env overrides)::
config = AsterConfig.from_file("aster.toml")
Inline (testing, scripts)::
config = AsterConfig(root_pubkey=pub)
32-byte ed25519 root public key (the deployment trust anchor).
Highest priority -- overrides root_pubkey_file when set.
Path to a file containing the root public key. Accepts either a
plain hex string or a JSON object with a "public_key" field.
Path to a JSON enrollment credential (pre-signed by the offline root key). Required when a node joins an existing producer mesh. Not needed for the founding node or for dev/ephemeral mode.
Cloud Instance Identity Document token. Required when the enrollment
credential's policy includes aster.iid_* attributes (AWS/GCP/Azure).
Set via ASTER_ENROLLMENT_CREDENTIAL_IID.
Producer's endpoint address (base64 NodeAddr or EndpointId hex string).
The consumer dials this to reach RPC + admission + blobs/docs/gossip.
Set via ASTER_ENDPOINT_ADDR.
32-byte node identity key. Determines the stable EndpointId.
If unset, a fresh identity is generated each run (fine for dev).
Enable mDNS local network discovery. Nodes on the same LAN can find
each other without relay servers. Default off.
Env: ASTER_LOCAL_DISCOVERY. TOML: [network] local_discovery.
Log output format: "json" for structured (k8s/ELK) or "text" for dev.
Env: ASTER_LOG_FORMAT. TOML: [logging] format.
Log level: "debug", "info", "warning", "error".
Env: ASTER_LOG_LEVEL. TOML: [logging] level.
Mask sensitive fields in logs (keys, credentials, endpoint IDs).
Env: ASTER_LOG_MASK. TOML: [logging] mask.
Path to .aster-identity TOML file. When set, the node key and
enrollment credentials are loaded from this file.
Env: ASTER_IDENTITY_FILE. Default: looks for .aster-identity
in the current working directory.
402 def resolve_root_pubkey(self) -> bytes | None: 403 """Return the root public key, resolving from config sources. 404 405 Resolution order: 406 407 1. Inline ``root_pubkey`` (highest priority). 408 2. ``root_pubkey_file`` (hex string or JSON with ``public_key``). 409 3. Generate an ephemeral keypair (dev mode only -- logged). 410 411 The root *private* key never appears here. In dev mode, a transient 412 private key is stored on ``_ephemeral_privkey`` so the companion 413 ``simple_consumer`` example can auto-mint a credential; it is never 414 persisted or configurable. 415 """ 416 if self.root_pubkey is not None: 417 return self.root_pubkey 418 419 if self.root_pubkey_file: 420 pub = _load_pubkey_from_file(self.root_pubkey_file) 421 if pub is not None: 422 self.root_pubkey = pub 423 return pub 424 _config_logger.warning( 425 "Root pubkey file %s not found or invalid", self.root_pubkey_file 426 ) 427 428 # Dev mode: generate ephemeral keypair if admission is needed. 429 if not self.allow_all_consumers or not self.allow_all_producers: 430 from .trust.signing import generate_root_keypair 431 432 priv, pub = generate_root_keypair() 433 self._ephemeral_privkey = priv # transient, never persisted 434 self.root_pubkey = pub 435 self._sources["root_pubkey"] = "ephemeral (dev mode)" 436 _config_logger.info( 437 "Generated ephemeral root key " 438 "(set ASTER_ROOT_PUBKEY_FILE for production)" 439 ) 440 return pub 441 442 return None
Return the root public key, resolving from config sources.
Resolution order:
- Inline
root_pubkey(highest priority). root_pubkey_file(hex string or JSON withpublic_key).- Generate an ephemeral keypair (dev mode only -- logged).
The root private key never appears here. In dev mode, a transient
private key is stored on _ephemeral_privkey so the companion
simple_consumer example can auto-mint a credential; it is never
persisted or configurable.
444 def load_identity(self, peer_name: str | None = None, 445 role: str | None = None) -> tuple[bytes | None, dict | None]: 446 """Load a peer entry from the ``.aster-identity`` file. 447 448 Args: 449 peer_name: Select peer by ``name``. If None, auto-selects by 450 ``role`` (first match). 451 role: Fallback selector when ``peer_name`` is None. 452 ``"producer"`` for AsterServer, ``"consumer"`` for AsterClient. 453 454 Returns: 455 ``(secret_key_bytes, peer_dict)`` or ``(None, None)`` when no 456 identity file is found. 457 """ 458 import base64 as _b64 459 460 path = self._resolve_identity_path() 461 if path is None: 462 return None, None 463 464 if sys.version_info >= (3, 11): 465 import tomllib as _tl 466 else: 467 import tomli as _tl # type: ignore[no-redef] 468 469 with open(path, "rb") as f: 470 data = _tl.load(f) 471 472 node = data.get("node", {}) 473 secret_key_b64 = node.get("secret_key") 474 secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None 475 476 # Find the peer entry 477 peers = data.get("peers", []) 478 peer = None 479 if peer_name is not None: 480 for p in peers: 481 if p.get("name") == peer_name: 482 peer = p 483 break 484 elif role is not None: 485 for p in peers: 486 if p.get("role") == role: 487 peer = p 488 break 489 elif peers: 490 peer = peers[0] # single peer → auto-select 491 492 return secret_key, peer
Load a peer entry from the .aster-identity file.
Args:
peer_name: Select peer by name. If None, auto-selects by
role (first match).
role: Fallback selector when peer_name is None.
"producer" for AsterServer, "consumer" for AsterClient.
Returns:
(secret_key_bytes, peer_dict) or (None, None) when no
identity file is found.
503 def load_identity_from_path( 504 self, 505 path: str, 506 peer_name: str | None = None, 507 role: str | None = None, 508 ) -> tuple[bytes | None, dict | None]: 509 """Load secret_key + peer entry from a specific TOML identity file. 510 511 Same return shape as :meth:`load_identity` but reads from an 512 explicit path instead of consulting ``self.identity_file``. Used 513 by ``AsterClient`` to fold the secret key out of an 514 ``enrollment_credential_file`` when no separate ``identity=`` was 515 provided -- both options should do the same thing for the same 516 TOML file produced by ``aster enroll node``. 517 """ 518 import base64 as _b64 519 520 if sys.version_info >= (3, 11): 521 import tomllib as _tl 522 else: 523 import tomli as _tl # type: ignore[no-redef] 524 525 with open(path, "rb") as f: 526 data = _tl.load(f) 527 528 node = data.get("node", {}) 529 secret_key_b64 = node.get("secret_key") 530 secret_key = _b64.b64decode(secret_key_b64) if secret_key_b64 else None 531 532 peers = data.get("peers", []) 533 peer = None 534 if peer_name is not None: 535 for p in peers: 536 if p.get("name") == peer_name: 537 peer = p 538 break 539 elif role is not None: 540 for p in peers: 541 if p.get("role") == role: 542 peer = p 543 break 544 elif peers: 545 peer = peers[0] 546 547 return secret_key, peer
Load secret_key + peer entry from a specific TOML identity file.
Same return shape as load_identity() but reads from an
explicit path instead of consulting self.identity_file. Used
by AsterClient to fold the secret key out of an
enrollment_credential_file when no separate identity= was
provided -- both options should do the same thing for the same
TOML file produced by aster enroll node.
549 def to_endpoint_config(self) -> EndpointConfig | None: 550 """Build an :class:`EndpointConfig` from the network fields. 551 552 Returns ``None`` when all network fields are at their defaults 553 (so ``IrohNode.memory_with_alpns`` can use the fast 554 ``Endpoint::bind(presets::N0)`` path). 555 """ 556 has_custom = any([ 557 self.relay_mode, self.secret_key, self.bind_addr, 558 self.enable_monitoring, self.enable_hooks, 559 self.clear_ip_transports, self.clear_relay_transports, 560 self.portmapper_config, self.proxy_url, self.proxy_from_env, 561 self.local_discovery, 562 self.hook_timeout_ms != 5000, 563 ]) 564 if not has_custom: 565 return None 566 return EndpointConfig( 567 alpns=[], # Router sets ALPNs 568 relay_mode=self.relay_mode, 569 secret_key=self.secret_key, 570 enable_monitoring=self.enable_monitoring, 571 enable_hooks=self.enable_hooks, 572 hook_timeout_ms=self.hook_timeout_ms, 573 bind_addr=self.bind_addr, 574 clear_ip_transports=self.clear_ip_transports, 575 clear_relay_transports=self.clear_relay_transports, 576 portmapper_config=self.portmapper_config, 577 proxy_url=self.proxy_url, 578 proxy_from_env=self.proxy_from_env, 579 enable_local_discovery=self.local_discovery, 580 )
Build an EndpointConfig from the network fields.
Returns None when all network fields are at their defaults
(so IrohNode.memory_with_alpns can use the fast
Endpoint::bind(presets::N0) path).
584 def print_config(self, *, json: bool = False) -> str: 585 """Render the resolved configuration with provenance and masking. 586 587 Sensitive fields (``secret_key``, ``enrollment_credential_file``) 588 are masked. ``root_pubkey`` is public and shown in full. 589 590 Args: 591 json: If True, return a JSON string; otherwise a human-readable table. 592 593 Returns: 594 The rendered config string (also printed to stdout). 595 """ 596 sections = { 597 "trust": [ 598 ("root_pubkey", self._fmt_bytes(self.root_pubkey, mask=False)), 599 ("root_pubkey_file", self.root_pubkey_file or "<not set>"), 600 ("enrollment_credential_file", self._fmt_masked(self.enrollment_credential_file)), 601 ("enrollment_credential_iid", self._fmt_masked(self.enrollment_credential_iid)), 602 ("allow_all_consumers", self.allow_all_consumers), 603 ("allow_all_producers", self.allow_all_producers), 604 ], 605 "connect": [ 606 ("endpoint_addr", self.endpoint_addr or "<not set>"), 607 ], 608 "network": [ 609 ("secret_key", self._fmt_bytes(self.secret_key, mask=True)), 610 ("relay_mode", self.relay_mode or "<default>"), 611 ("bind_addr", self.bind_addr or "<any>"), 612 ("enable_monitoring", self.enable_monitoring), 613 ("enable_hooks", self.enable_hooks), 614 ], 615 "storage": [ 616 ("path", self.storage_path or "<in-memory>"), 617 ], 618 } 619 620 if json: 621 out: dict = {} 622 for section, fields in sections.items(): 623 out[section] = {} 624 for name, value in fields: 625 source = self._sources.get(name, "default") 626 out[section][name] = {"value": value, "source": source} 627 text = _json.dumps(out, indent=2, default=str) 628 else: 629 lines = [] 630 for section, fields in sections.items(): 631 lines.append(f" [{section}]") 632 for name, value in fields: 633 source = self._sources.get(name, "default") 634 lines.append(f" {name:<28s}: {value!s:<36s} ({source})") 635 text = "\n".join(lines) 636 637 print(text) 638 return text
Render the resolved configuration with provenance and masking.
Sensitive fields (secret_key, enrollment_credential_file)
are masked. root_pubkey is public and shown in full.
Args: json: If True, return a JSON string; otherwise a human-readable table.
Returns: The rendered config string (also printed to stdout).
657 @classmethod 658 def from_env(cls) -> "AsterConfig": 659 """Build config from ``ASTER_*`` environment variables only.""" 660 return cls._load(toml_data=None, toml_path=None)
Build config from ASTER_* environment variables only.
662 @classmethod 663 def from_file(cls, path: str | Path) -> "AsterConfig": 664 """Build config from a TOML file, with env-var overrides.""" 665 with Path(path).open("rb") as fh: 666 raw = tomllib.load(fh) 667 return cls._load(toml_data=raw, toml_path=str(path))
Build config from a TOML file, with env-var overrides.
545def service( 546 name_or_cls=None, 547 *, 548 name: str | None = None, 549 version: int = 1, 550 serialization: list[SerializationMode] | SerializationMode | None = None, 551 scoped: str = "shared", 552 interceptors: list[type] | None = None, 553 max_concurrent_streams: int | None = None, 554 requires: CapabilityRequirement | None = None, 555 public: bool = False, 556 metadata: Any = None, 557) -> Callable[[type], type] | type: 558 """Class decorator to mark a class as an Aster RPC service. 559 560 Supports three calling forms:: 561 562 @service # bare -- name = class name 563 @service("AgentControl") # explicit name 564 @service(name="AgentControl", version=2) # keyword name 565 @service(version=2, scoped="session") # keyword-only, name = class name 566 567 Args: 568 name_or_cls: Service name (str), or the class itself when used as 569 bare ``@service`` without parentheses. Defaults to the class name. 570 name: Explicit service name (keyword-only alias for name_or_cls). 571 version: The service version (default: 1). 572 serialization: Supported serialization modes. Defaults to [XLANG]. 573 scoped: Service scope: "shared" or "session". Default "shared". 574 The legacy alias "stream" is still accepted on input. 575 interceptors: List of interceptor classes to apply to all methods. 576 max_concurrent_streams: Maximum concurrent QUIC streams per 577 connection (i.e. per client peer). When a client exceeds 578 this limit, additional streams are rejected at the QUIC 579 layer. This applies to the server as a whole, not 580 per-service. ``None`` means unlimited. 581 582 Returns: 583 A decorator function (or the decorated class if used bare). 584 """ 585 # Merge positional name_or_cls with keyword name= 586 if name is not None and name_or_cls is None: 587 name_or_cls = name 588 589 # @service (bare, no parens) -- name_or_cls is the class itself 590 if isinstance(name_or_cls, type): 591 return _apply_service_decorator( 592 name_or_cls, 593 name=name_or_cls.__name__, 594 version=version, 595 serialization=serialization, 596 scoped=scoped, 597 interceptors=interceptors, 598 max_concurrent_streams=max_concurrent_streams, 599 requires=requires, 600 public=public, 601 metadata=metadata, 602 ) 603 604 # @service() or @service("Foo") or @service(name="Foo", version=2) 605 name = name_or_cls # str | None 606 607 def decorator(cls: type) -> type: 608 return _apply_service_decorator( 609 cls, 610 name=name if name is not None else cls.__name__, 611 version=version, 612 serialization=serialization, 613 scoped=scoped, 614 interceptors=interceptors, 615 max_concurrent_streams=max_concurrent_streams, 616 requires=requires, 617 public=public, 618 metadata=metadata, 619 ) 620 621 return decorator
Class decorator to mark a class as an Aster RPC service.
Supports three calling forms::
@service # bare -- name = class name
@service("AgentControl") # explicit name
@service(name="AgentControl", version=2) # keyword name
@service(version=2, scoped="session") # keyword-only, name = class name
Args:
name_or_cls: Service name (str), or the class itself when used as
bare @service without parentheses. Defaults to the class name.
name: Explicit service name (keyword-only alias for name_or_cls).
version: The service version (default: 1).
serialization: Supported serialization modes. Defaults to [XLANG].
scoped: Service scope: "shared" or "session". Default "shared".
The legacy alias "stream" is still accepted on input.
interceptors: List of interceptor classes to apply to all methods.
max_concurrent_streams: Maximum concurrent QUIC streams per
connection (i.e. per client peer). When a client exceeds
this limit, additional streams are rejected at the QUIC
layer. This applies to the server as a whole, not
per-service. None means unlimited.
Returns: A decorator function (or the decorated class if used bare).
280def server_stream( 281 method_or_timeout: Callable[P, AsyncIterator[Any]] | float | None = None, 282 name: str | None = None, 283 timeout: float | None = None, 284 serialization: SerializationMode | None = None, 285 requires: CapabilityRequirement | None = None, 286 metadata: Any = None, 287) -> Callable[P, AsyncIterator[Any]]: 288 """Decorator to mark a method as a server-streaming RPC. 289 290 Can be used as: 291 @server_stream # without parens 292 @server_stream() # with parens 293 @server_stream(timeout=30.0) # with options 294 295 Args: 296 method_or_timeout: Either the method (when used without parens) or a timeout value. 297 timeout: Optional timeout in seconds. 298 serialization: Override the serialization mode for this method. 299 300 Returns: 301 A decorator function or the decorated method. 302 303 Example:: 304 305 @service(name="MyService", version=1) 306 class MyService: 307 @server_stream 308 async def watch_items(self, req: WatchRequest) -> AsyncIterator[ItemUpdate]: 309 for item in items: 310 yield ItemUpdate(item=item) 311 """ 312 # Handle @server_stream (no parens) - method is passed directly 313 if callable(method_or_timeout): 314 method = method_or_timeout 315 _apply_server_stream_decorator(method, name=name, timeout=timeout, serialization=serialization, requires=requires, metadata=metadata) 316 return method 317 318 # Handle @server_stream() or @server_stream(timeout=...) 319 actual_timeout = method_or_timeout if method_or_timeout is not None else timeout 320 321 def decorator(method: Callable[P, AsyncIterator[Any]]) -> Callable[P, AsyncIterator[Any]]: 322 _apply_server_stream_decorator(method, name=name, timeout=actual_timeout, serialization=serialization, requires=requires, metadata=metadata) 323 return method 324 325 return decorator
Decorator to mark a method as a server-streaming RPC.
Can be used as: @server_stream # without parens @server_stream() # with parens @server_stream(timeout=30.0) # with options
Args: method_or_timeout: Either the method (when used without parens) or a timeout value. timeout: Optional timeout in seconds. serialization: Override the serialization mode for this method.
Returns: A decorator function or the decorated method.
Example::
@service(name="MyService", version=1)
class MyService:
@server_stream
async def watch_items(self, req: WatchRequest) -> AsyncIterator[ItemUpdate]:
for item in items:
yield ItemUpdate(item=item)
367def client_stream( 368 method_or_timeout: Callable[P, Any] | float | None = None, 369 name: str | None = None, 370 idempotent: bool = False, 371 serialization: SerializationMode | None = None, 372 requires: CapabilityRequirement | None = None, 373 metadata: Any = None, 374) -> Callable[P, Any] | Callable: 375 """Decorator to mark a method as a client-streaming RPC. 376 377 Can be used as: 378 @client_stream # without parens 379 @client_stream() # with parens 380 @client_stream(timeout=30.0) # with options 381 382 Args: 383 method_or_timeout: Either the method (when used without parens) or a timeout value. 384 idempotent: Whether the method is safe to retry. 385 serialization: Override the serialization mode for this method. 386 387 Returns: 388 A decorator function or the decorated method. 389 390 Example:: 391 392 @service(name="MyService", version=1) 393 class MyService: 394 @client_stream 395 async def aggregate(self, reqs: AsyncIterator[NumberRequest]) -> SumResponse: 396 total = 0 397 async for req in reqs: 398 total += req.value 399 return SumResponse(total=total) 400 """ 401 # Handle @client_stream (no parens) - method is passed directly 402 if callable(method_or_timeout): 403 method = method_or_timeout 404 _apply_client_stream_decorator(method, name=name, idempotent=idempotent, serialization=serialization, requires=requires, metadata=metadata) 405 return method 406 407 # Handle @client_stream() or @client_stream(timeout=...) 408 timeout = method_or_timeout 409 410 def decorator(method: Callable[P, Any]) -> Callable[P, Any]: 411 _apply_client_stream_decorator(method, name=name, timeout=timeout, idempotent=idempotent, serialization=serialization, requires=requires, metadata=metadata) 412 return method 413 414 return decorator
Decorator to mark a method as a client-streaming RPC.
Can be used as: @client_stream # without parens @client_stream() # with parens @client_stream(timeout=30.0) # with options
Args: method_or_timeout: Either the method (when used without parens) or a timeout value. idempotent: Whether the method is safe to retry. serialization: Override the serialization mode for this method.
Returns: A decorator function or the decorated method.
Example::
@service(name="MyService", version=1)
class MyService:
@client_stream
async def aggregate(self, reqs: AsyncIterator[NumberRequest]) -> SumResponse:
total = 0
async for req in reqs:
total += req.value
return SumResponse(total=total)
456def bidi_stream( 457 method_or_timeout: Callable[P, AsyncIterator[Any]] | float | None = None, 458 name: str | None = None, 459 timeout: float | None = None, 460 serialization: SerializationMode | None = None, 461 requires: CapabilityRequirement | None = None, 462 metadata: Any = None, 463) -> Callable[P, AsyncIterator[Any]]: 464 """Decorator to mark a method as a bidirectional-streaming RPC. 465 466 Can be used as: 467 @bidi_stream # without parens 468 @bidi_stream() # with parens 469 @bidi_stream(timeout=30.0) # with options 470 471 Args: 472 method_or_timeout: Either the method (when used without parens) or a timeout value. 473 timeout: Optional timeout in seconds. 474 serialization: Override the serialization mode for this method. 475 476 Returns: 477 A decorator function or the decorated method. 478 479 Example:: 480 481 @service(name="MyService", version=1) 482 class MyService: 483 @bidi_stream 484 async def chat( 485 self, requests: AsyncIterator[ChatMessage] 486 ) -> AsyncIterator[ChatMessage]: 487 async for req in requests: 488 yield ChatMessage(text=f"echo: {req.text}") 489 """ 490 # Handle @bidi_stream (no parens) - method is passed directly 491 if callable(method_or_timeout): 492 method = method_or_timeout 493 _apply_bidi_stream_decorator(method, name=name, timeout=timeout, serialization=serialization, requires=requires, metadata=metadata) 494 return method 495 496 # Handle @bidi_stream() or @bidi_stream(timeout=...) 497 actual_timeout = method_or_timeout if method_or_timeout is not None else timeout 498 499 def decorator(method: Callable[P, AsyncIterator[Any]]) -> Callable[P, AsyncIterator[Any]]: 500 _apply_bidi_stream_decorator(method, name=name, timeout=actual_timeout, serialization=serialization, requires=requires, metadata=metadata) 501 return method 502 503 return decorator
Decorator to mark a method as a bidirectional-streaming RPC.
Can be used as: @bidi_stream # without parens @bidi_stream() # with parens @bidi_stream(timeout=30.0) # with options
Args: method_or_timeout: Either the method (when used without parens) or a timeout value. timeout: Optional timeout in seconds. serialization: Override the serialization mode for this method.
Returns: A decorator function or the decorated method.
Example::
@service(name="MyService", version=1)
class MyService:
@bidi_stream
async def chat(
self, requests: AsyncIterator[ChatMessage]
) -> AsyncIterator[ChatMessage]:
async for req in requests:
yield ChatMessage(text=f"echo: {req.text}")
66def wire_type(tag: str, *, metadata: dict | None = None): 67 """Declare a stable wire identity for a dataclass. 68 69 The *tag* string is split on the last ``/`` into 70 ``(namespace, typename)``. If there is no ``/`` 71 the namespace is the empty string. 72 73 Use ``@wire_type`` when you need explicit control over the wire 74 identity of a type (production services, cross-language compat). 75 If omitted, the ``@service`` decorator auto-derives a tag from the 76 module + class name at decoration time. 77 78 Args: 79 tag: Wire type tag (e.g., ``"billing/Invoice"``). 80 metadata: Optional dict mapping field names to ``Metadata`` objects 81 for describing individual fields to AI agents. 82 83 Example:: 84 85 @wire_type("billing/Invoice") 86 @dataclass 87 class Invoice: 88 customer: str = "" 89 amount: float = 0.0 90 paid: bool = False 91 92 Serialization rules: 93 94 - **Supported field types:** ``str``, ``int``, ``float``, ``bool``, 95 ``bytes``, ``list[T]``, ``dict[str, V]``, ``Optional[T]``, 96 and other ``@wire_type`` dataclasses. 97 98 - **Use ``Optional[T]`` for nullable fields, NOT ``T | None``.** 99 The Fory serializer does not support PEP 604 union syntax. 100 This will fail at runtime:: 101 102 bio: str | None = None # WRONG -- Fory can't serialize this 103 104 Use this instead:: 105 106 bio: Optional[str] = None # CORRECT 107 108 - **Every field must have a default value.** Fory XLANG needs defaults 109 for cross-language compatibility. Use ``""`` for strings, ``0`` for 110 ints, ``False`` for bools, and ``field(default_factory=list)`` for 111 collections. 112 113 - **``@rpc`` methods must return a ``@wire_type`` dataclass, not None.** 114 Even if your response has no fields, define an empty response type:: 115 116 @wire_type("myapp/AckResult") 117 @dataclass 118 class AckResult: 119 ok: bool = True 120 121 - **Generic wrapper types** (e.g., ``SignedRequest[T]``) are supported. 122 The ``@wire_type`` tag is on the outer class; the type parameter is 123 resolved at manifest extraction time. 124 """ 125 126 def decorator(cls): 127 parts = tag.rsplit("/", 1) 128 if len(parts) == 2: 129 cls.__fory_namespace__ = parts[0] 130 cls.__fory_typename__ = parts[1] 131 else: 132 cls.__fory_namespace__ = "" 133 cls.__fory_typename__ = tag 134 cls.__wire_type__ = tag 135 if metadata: 136 cls.__wire_type_field_metadata__ = metadata 137 return cls 138 139 return decorator
Declare a stable wire identity for a dataclass.
The tag string is split on the last / into
(namespace, typename). If there is no /
the namespace is the empty string.
Use @wire_type when you need explicit control over the wire
identity of a type (production services, cross-language compat).
If omitted, the @service decorator auto-derives a tag from the
module + class name at decoration time.
Args:
tag: Wire type tag (e.g., "billing/Invoice").
metadata: Optional dict mapping field names to Metadata objects
for describing individual fields to AI agents.
Example::
@wire_type("billing/Invoice")
@dataclass
class Invoice:
customer: str = ""
amount: float = 0.0
paid: bool = False
Serialization rules:
Supported field types:
str,int,float,bool,bytes,list[T],dict[str, V],Optional[T], and other@wire_typedataclasses.Use
Optional[T]for nullable fields, NOTT | None. The Fory serializer does not support PEP 604 union syntax. This will fail at runtime::bio: str | None = None # WRONG -- Fory can't serialize thisUse this instead::
bio: Optional[str] = None # CORRECTEvery field must have a default value. Fory XLANG needs defaults for cross-language compatibility. Use
""for strings,0for ints,Falsefor bools, andfield(default_factory=list)for collections.@rpcmethods must return a@wire_typedataclass, not None. Even if your response has no fields, define an empty response type::@wire_type("myapp/AckResult") @dataclass class AckResult: ok: bool = TrueGeneric wrapper types (e.g.,
SignedRequest[T]) are supported. The@wire_typetag is on the outer class; the type parameter is resolved at manifest extraction time.
42def any_of(*roles: str) -> CapabilityRequirement: 43 """Require ANY ONE of the listed roles (OR logic). 44 45 The caller is admitted if they have at least one of the specified roles. 46 Accepts plain strings or str enum values. 47 48 Example:: 49 50 @rpc(requires=any_of("ops.logs", "ops.admin")) 51 async def tail_logs(self, req): ... 52 53 @rpc(requires=any_of(Role.LOGS, Role.ADMIN)) 54 async def tail_logs(self, req): ... 55 """ 56 return CapabilityRequirement( 57 kind=CapabilityKind.ANY_OF, 58 roles=[_role_value(r) for r in roles], 59 )
Require ANY ONE of the listed roles (OR logic).
The caller is admitted if they have at least one of the specified roles. Accepts plain strings or str enum values.
Example::
@rpc(requires=any_of("ops.logs", "ops.admin"))
async def tail_logs(self, req): ...
@rpc(requires=any_of(Role.LOGS, Role.ADMIN))
async def tail_logs(self, req): ...
62def all_of(*roles: str) -> CapabilityRequirement: 63 """Require ALL of the listed roles (AND logic). 64 65 The caller is admitted only if they have every specified role. 66 Accepts plain strings or str enum values. 67 68 Example:: 69 70 @rpc(requires=all_of("ops.status", "ops.admin")) 71 async def admin_status(self, req): ... 72 73 @rpc(requires=all_of(Role.STATUS, Role.ADMIN)) 74 async def admin_status(self, req): ... 75 """ 76 return CapabilityRequirement( 77 kind=CapabilityKind.ALL_OF, 78 roles=[_role_value(r) for r in roles], 79 )
Require ALL of the listed roles (AND logic).
The caller is admitted only if they have every specified role. Accepts plain strings or str enum values.
Example::
@rpc(requires=all_of("ops.status", "ops.admin"))
async def admin_status(self, req): ...
@rpc(requires=all_of(Role.STATUS, Role.ADMIN))
async def admin_status(self, req): ...
14class SerializationMode(IntEnum): 15 """Serialization protocol negotiated between client and server.""" 16 17 XLANG = 0 18 NATIVE = 1 19 ROW = 2 20 JSON = 3
Serialization protocol negotiated between client and server.
75class RpcError(Exception): 76 """Exception raised when an RPC call fails. 77 78 Catch this in client code to handle server-side errors:: 79 80 from aster import RpcError, StatusCode 81 82 try: 83 resp = await svc.my_method(request) 84 except RpcError as e: 85 print(f"RPC failed: {e.code.name} -- {e.message}") 86 if e.details: 87 print(f"Details: {e.details}") 88 89 Attributes: 90 code: The :class:`StatusCode` describing the failure category. 91 message: A human-readable error description. 92 details: Arbitrary string key/value pairs carrying extra context. 93 """ 94 95 def __init__( 96 self, 97 code: StatusCode, 98 message: str = "", 99 details: dict[str, str] | None = None, 100 ) -> None: 101 self.code = code 102 self.message = message 103 self.details: dict[str, str] = details or {} 104 super().__init__(f"[{code.name}] {message}") 105 106 def __repr__(self) -> str: 107 return ( 108 f"RpcError(code={self.code!r}, message={self.message!r}, " 109 f"details={self.details!r})" 110 ) 111 112 @classmethod 113 def from_status( 114 cls, 115 code: StatusCode, 116 message: str = "", 117 details: dict[str, str] | None = None, 118 ) -> "RpcError": 119 """Create the most specific RpcError subclass for a status code.""" 120 exc_type = _RPC_ERROR_TYPES.get(code, RpcError) 121 return exc_type(message=message, details=details)
Exception raised when an RPC call fails.
Catch this in client code to handle server-side errors::
from aster import RpcError, StatusCode
try:
resp = await svc.my_method(request)
except RpcError as e:
print(f"RPC failed: {e.code.name} -- {e.message}")
if e.details:
print(f"Details: {e.details}")
Attributes:
code: The StatusCode describing the failure category.
message: A human-readable error description.
details: Arbitrary string key/value pairs carrying extra context.
112 @classmethod 113 def from_status( 114 cls, 115 code: StatusCode, 116 message: str = "", 117 details: dict[str, str] | None = None, 118 ) -> "RpcError": 119 """Create the most specific RpcError subclass for a status code.""" 120 exc_type = _RPC_ERROR_TYPES.get(code, RpcError) 121 return exc_type(message=message, details=details)
Create the most specific RpcError subclass for a status code.
13class StatusCode(IntEnum): 14 """RPC status codes. 15 16 Codes 0-16 mirror gRPC's ``google.rpc.Code`` semantically. Codes 17 100+ are Aster-native and have no gRPC equivalent. The 17-99 18 range is reserved as a buffer in case gRPC ever extends its enum. 19 20 A common gripe with gRPC's status codes is that there are too 21 few of them to express the variety of failures real services 22 actually hit -- everything ends up shoehorned into a handful of 23 overloaded categories. The 100+ space gives Aster room to mint 24 more precise codes over time, signalling clearly that they are 25 intentionally separate from the gRPC vocabulary. 26 27 Use these to inspect errors returned by the server:: 28 29 try: 30 resp = await svc.get_status(req) 31 except RpcError as e: 32 if e.code == StatusCode.NOT_FOUND: 33 print("Service not found") 34 elif e.code == StatusCode.CONTRACT_VIOLATION: 35 print("Wire shape doesn't match the contract") 36 """ 37 38 # ── gRPC-mirrored codes (0-16) ─────────────────────────────────── 39 OK = 0 #: Success. 40 CANCELLED = 1 #: The call was cancelled by the client. 41 UNKNOWN = 2 #: Unknown error (server bug or unhandled exception). 42 INVALID_ARGUMENT = 3 #: Client sent an invalid request. 43 DEADLINE_EXCEEDED = 4 #: The call timed out. 44 NOT_FOUND = 5 #: Requested resource does not exist. 45 ALREADY_EXISTS = 6 #: Resource already exists (e.g., duplicate create). 46 PERMISSION_DENIED = 7 #: Caller lacks required capability. 47 RESOURCE_EXHAUSTED = 8 #: Rate limit or quota exceeded. 48 FAILED_PRECONDITION = 9 #: Precondition not met (e.g., wrong state). 49 ABORTED = 10 #: Operation aborted (e.g., concurrency conflict). 50 OUT_OF_RANGE = 11 #: Value outside valid range. 51 UNIMPLEMENTED = 12 #: Method not implemented by the server. 52 INTERNAL = 13 #: Internal server error. 53 UNAVAILABLE = 14 #: Server temporarily unavailable (retry later). 54 DATA_LOSS = 15 #: Unrecoverable data loss. 55 UNAUTHENTICATED = 16 #: No valid credentials provided. 56 57 # ── Reserved range (17-99) ─────────────────────────────────────── 58 # Intentionally left empty -- reserved as a buffer in case gRPC 59 # ever extends its enum upward. New Aster-native codes start at 60 # 101 and grow from there. 61 62 # ── Aster-native codes (100+) ──────────────────────────────────── 63 #: The wire payload doesn't match the published contract -- e.g. 64 #: the JSON dict has fields the server's @wire_type dataclass 65 #: doesn't declare, or vice versa. Distinct from INVALID_ARGUMENT 66 #: because the violation is about the SHAPE of the data, not its 67 #: value, and because shape violations can occur at any nesting 68 #: depth (a top-level INVALID_ARGUMENT label doesn't apply when 69 #: the bad field is two objects deep). The producer owns the 70 #: contract; consumers must use the field names defined by the 71 #: producer's manifest. 72 CONTRACT_VIOLATION = 101
RPC status codes.
Codes 0-16 mirror gRPC's google.rpc.Code semantically. Codes
100+ are Aster-native and have no gRPC equivalent. The 17-99
range is reserved as a buffer in case gRPC ever extends its enum.
A common gripe with gRPC's status codes is that there are too few of them to express the variety of failures real services actually hit -- everything ends up shoehorned into a handful of overloaded categories. The 100+ space gives Aster room to mint more precise codes over time, signalling clearly that they are intentionally separate from the gRPC vocabulary.
Use these to inspect errors returned by the server::
try:
resp = await svc.get_status(req)
except RpcError as e:
if e.code == StatusCode.NOT_FOUND:
print("Service not found")
elif e.code == StatusCode.CONTRACT_VIOLATION:
print("Wire shape doesn't match the contract")
96class AdmissionDeniedError(PermissionError): 97 """Raised when a consumer is refused by the server's admission check. 98 99 The server never reveals *why* admission failed (no oracle leak), so this 100 exception enumerates the common causes as a hint to the user rather than 101 a precise diagnosis. 102 """ 103 104 def __init__( 105 self, 106 *, 107 had_credential: bool, 108 credential_file: str | None, 109 our_endpoint_id: str, 110 server_address: str, 111 ) -> None: 112 self.had_credential = had_credential 113 self.credential_file = credential_file 114 self.our_endpoint_id = our_endpoint_id 115 self.server_address = server_address 116 super().__init__(self.format_hint()) 117 118 def format_hint(self) -> str: 119 """Return a multi-line actionable hint suitable for CLI output.""" 120 short_id = (self.our_endpoint_id[:16] + "...") if self.our_endpoint_id else "<unknown>" 121 if not self.had_credential: 122 return ( 123 "consumer admission denied -- this server requires a credential.\n" 124 " - Get an enrollment credential file (.cred) from the server's operator.\n" 125 " - Then retry with: --rcan <path/to/file.cred>\n" 126 " (or set ASTER_ENROLLMENT_CREDENTIAL=<path> in the environment)" 127 ) 128 cred_label = self.credential_file or "<credential>" 129 return ( 130 f"consumer admission denied -- the server rejected your credential.\n" 131 f" credential: {cred_label}\n" 132 f" your node: {short_id}\n" 133 " Common causes:\n" 134 " 1. The credential expired (check the 'Expires' field on the file).\n" 135 " 2. The credential was issued to a DIFFERENT node. Credentials are\n" 136 " bound to a single endpoint id: if you copied this file from\n" 137 " another machine/process, the server sees a different node id\n" 138 " and refuses admission. Ask the operator to re-issue it for\n" 139 f" endpoint_id={short_id}.\n" 140 " 3. The server trusts a different root key than the one that signed\n" 141 " this credential.\n" 142 " 4. The credential's role/capabilities don't match this server's\n" 143 " policy (the server may reject unknown capabilities outright)." 144 )
Raised when a consumer is refused by the server's admission check.
The server never reveals why admission failed (no oracle leak), so this exception enumerates the common causes as a hint to the user rather than a precise diagnosis.
118 def format_hint(self) -> str: 119 """Return a multi-line actionable hint suitable for CLI output.""" 120 short_id = (self.our_endpoint_id[:16] + "...") if self.our_endpoint_id else "<unknown>" 121 if not self.had_credential: 122 return ( 123 "consumer admission denied -- this server requires a credential.\n" 124 " - Get an enrollment credential file (.cred) from the server's operator.\n" 125 " - Then retry with: --rcan <path/to/file.cred>\n" 126 " (or set ASTER_ENROLLMENT_CREDENTIAL=<path> in the environment)" 127 ) 128 cred_label = self.credential_file or "<credential>" 129 return ( 130 f"consumer admission denied -- the server rejected your credential.\n" 131 f" credential: {cred_label}\n" 132 f" your node: {short_id}\n" 133 " Common causes:\n" 134 " 1. The credential expired (check the 'Expires' field on the file).\n" 135 " 2. The credential was issued to a DIFFERENT node. Credentials are\n" 136 " bound to a single endpoint id: if you copied this file from\n" 137 " another machine/process, the server sees a different node id\n" 138 " and refuses admission. Ask the operator to re-issue it for\n" 139 f" endpoint_id={short_id}.\n" 140 " 3. The server trusts a different root key than the one that signed\n" 141 " this credential.\n" 142 " 4. The credential's role/capabilities don't match this server's\n" 143 " policy (the server may reject unknown capabilities outright)." 144 )
Return a multi-line actionable hint suitable for CLI output.
204class ContractViolationError(RpcError): 205 """Raised when the wire payload doesn't match the published contract. 206 207 Carries the offending field names in ``details`` under the key 208 ``unexpected_fields`` (comma-separated, sanitized via repr). 209 """ 210 211 def __init__(self, message: str = "", details: dict[str, str] | None = None) -> None: 212 super().__init__(StatusCode.CONTRACT_VIOLATION, message, details)
Raised when the wire payload doesn't match the published contract.
Carries the offending field names in details under the key
unexpected_fields (comma-separated, sanitized via repr).
16@dataclass 17class CallContext: 18 """Context for a single RPC call, available to interceptors and handlers. 19 20 Passed to every interceptor in the chain. Read ``service`` and ``method`` 21 to know which RPC is being called. Use ``metadata`` to pass headers 22 between client and server. Check ``remaining_seconds`` for deadline 23 awareness. 24 25 Attributes: 26 service: The service name (e.g., ``"MissionControl"``). 27 method: The method name (e.g., ``"getStatus"``). 28 call_id: Unique ID for this call (auto-generated UUID). 29 peer: Remote peer identifier (endpoint ID hex). 30 metadata: Key/value headers sent with the call. 31 attributes: Enrollment attributes from the consumer's credential. 32 deadline: Absolute deadline as epoch timestamp, or ``None``. 33 is_streaming: ``True`` for streaming RPC patterns. 34 pattern: RPC pattern (``"unary"``, ``"server_stream"``, etc.). 35 idempotent: ``True`` if the method is safe to retry. 36 attempt: Current retry attempt number (starts at 1). 37 """ 38 39 service: str 40 method: str 41 call_id: str = field(default_factory=lambda: str(uuid.uuid4())) 42 session_id: str | None = None 43 peer: str | None = None 44 metadata: dict[str, str] = field(default_factory=dict) 45 attributes: dict[str, str] = field(default_factory=dict) 46 deadline: float | None = None 47 is_streaming: bool = False 48 pattern: str | None = None 49 idempotent: bool = False 50 attempt: int = 1 51 52 @property 53 def remaining_seconds(self) -> float | None: 54 if self.deadline is None: 55 return None 56 return max(0.0, self.deadline - time.time()) 57 58 @property 59 def expired(self) -> bool: 60 remaining = self.remaining_seconds 61 return remaining is not None and remaining <= 0.0
Context for a single RPC call, available to interceptors and handlers.
Passed to every interceptor in the chain. Read service and method
to know which RPC is being called. Use metadata to pass headers
between client and server. Check remaining_seconds for deadline
awareness.
Attributes:
service: The service name (e.g., "MissionControl").
method: The method name (e.g., "getStatus").
call_id: Unique ID for this call (auto-generated UUID).
peer: Remote peer identifier (endpoint ID hex).
metadata: Key/value headers sent with the call.
attributes: Enrollment attributes from the consumer's credential.
deadline: Absolute deadline as epoch timestamp, or None.
is_streaming: True for streaming RPC patterns.
pattern: RPC pattern ("unary", "server_stream", etc.).
idempotent: True if the method is safe to retry.
attempt: Current retry attempt number (starts at 1).
64class Interceptor(ABC): 65 """Base interceptor interface.""" 66 67 async def on_request(self, ctx: CallContext, request: object) -> object: 68 return request 69 70 async def on_response(self, ctx: CallContext, response: object) -> object: 71 return response 72 73 async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None: 74 return error
Base interceptor interface.
18class DeadlineInterceptor(Interceptor): 19 """Validates and enforces call deadlines. 20 21 Args: 22 skew_tolerance_ms: Milliseconds of clock-skew tolerance added to the 23 deadline when checking on receipt. A request whose deadline has 24 already passed by more than this tolerance is rejected immediately 25 with ``DEADLINE_EXCEEDED``. Defaults to 5000 ms (5 seconds). 26 """ 27 28 def __init__(self, skew_tolerance_ms: int = 5000) -> None: 29 self._skew_tolerance_ms = skew_tolerance_ms 30 31 async def on_request(self, ctx: CallContext, request: object) -> object: 32 if ctx.deadline is not None: 33 now_epoch_ms = int(time.time() * 1000) 34 deadline_epoch_ms = int(ctx.deadline * 1000) 35 # Reject on receipt if expired beyond skew tolerance (S6.8.1) 36 if now_epoch_ms > deadline_epoch_ms + self._skew_tolerance_ms: 37 raise RpcError( 38 StatusCode.DEADLINE_EXCEEDED, 39 "deadline already expired on receipt " 40 f"(now={now_epoch_ms}, deadline={deadline_epoch_ms}, " 41 f"skew_tolerance={self._skew_tolerance_ms}ms)", 42 ) 43 # Standard expiry check (no tolerance) 44 if ctx.expired: 45 raise RpcError(StatusCode.DEADLINE_EXCEEDED, "deadline exceeded") 46 return request 47 48 def timeout_seconds(self, ctx: CallContext) -> float | None: 49 """Return remaining seconds until deadline, or None if no deadline set.""" 50 remaining = ctx.remaining_seconds 51 if remaining is None: 52 return None 53 return max(0.0, remaining)
Validates and enforces call deadlines.
Args:
skew_tolerance_ms: Milliseconds of clock-skew tolerance added to the
deadline when checking on receipt. A request whose deadline has
already passed by more than this tolerance is rejected immediately
with DEADLINE_EXCEEDED. Defaults to 5000 ms (5 seconds).
48 def timeout_seconds(self, ctx: CallContext) -> float | None: 49 """Return remaining seconds until deadline, or None if no deadline set.""" 50 remaining = ctx.remaining_seconds 51 if remaining is None: 52 return None 53 return max(0.0, remaining)
Return remaining seconds until deadline, or None if no deadline set.
12class AuthInterceptor(Interceptor): 13 """Injects and/or validates auth metadata.""" 14 15 def __init__( 16 self, 17 *, 18 token_provider: str | Callable[[], str] | None = None, 19 validator: str | Callable[[str], bool] | None = None, 20 metadata_key: str = "authorization", 21 scheme: str | None = "Bearer", 22 ) -> None: 23 self._token_provider = token_provider 24 self._validator = validator 25 self._metadata_key = metadata_key 26 self._scheme = scheme 27 28 async def on_request(self, ctx: CallContext, request: object) -> object: 29 if self._token_provider is not None and self._metadata_key not in ctx.metadata: 30 token = self._token_provider() if callable(self._token_provider) else self._token_provider 31 ctx.metadata[self._metadata_key] = ( 32 f"{self._scheme} {token}" if self._scheme else str(token) 33 ) 34 35 if self._validator is not None: 36 raw_token = ctx.metadata.get(self._metadata_key, "") 37 token = raw_token 38 prefix = f"{self._scheme} " if self._scheme else "" 39 if prefix and raw_token.startswith(prefix): 40 token = raw_token[len(prefix):] 41 valid = self._validator(token) if callable(self._validator) else token == self._validator 42 if not valid: 43 raise RpcError(StatusCode.UNAUTHENTICATED, "authentication failed") 44 45 return request
Injects and/or validates auth metadata.
13class RetryInterceptor(Interceptor): 14 """Provides retry policy hints for client calls.""" 15 16 def __init__( 17 self, 18 policy: RetryPolicy | None = None, 19 retryable_codes: set[StatusCode] | None = None, 20 ) -> None: 21 self.policy = policy or RetryPolicy() 22 self.retryable_codes = retryable_codes or {StatusCode.UNAVAILABLE} 23 24 def should_retry(self, ctx: CallContext, error: RpcError) -> bool: 25 return ctx.idempotent and error.code in self.retryable_codes 26 27 def backoff_seconds(self, attempt: int) -> float: 28 backoff = self.policy.backoff 29 delay_ms = min( 30 backoff.max_ms, 31 int(backoff.initial_ms * (backoff.multiplier ** max(0, attempt - 1))), 32 ) 33 jitter = delay_ms * backoff.jitter * random.random() 34 return (delay_ms + jitter) / 1000.0
Provides retry policy hints for client calls.
12class CircuitBreakerInterceptor(Interceptor): 13 """Simple CLOSED → OPEN → HALF_OPEN circuit breaker.""" 14 15 CLOSED = "closed" 16 OPEN = "open" 17 HALF_OPEN = "half_open" 18 19 def __init__( 20 self, 21 *, 22 failure_threshold: int = 3, 23 recovery_timeout: float = 5.0, 24 half_open_max_calls: int = 1, 25 ) -> None: 26 self.failure_threshold = failure_threshold 27 self.recovery_timeout = recovery_timeout 28 self.half_open_max_calls = half_open_max_calls 29 self.state = self.CLOSED 30 self.failure_count = 0 31 self.opened_at = 0.0 32 self.half_open_calls = 0 33 34 def before_call(self, ctx: CallContext) -> None: 35 now = time.monotonic() 36 if self.state == self.OPEN: 37 if now - self.opened_at >= self.recovery_timeout: 38 self.state = self.HALF_OPEN 39 self.half_open_calls = 0 40 else: 41 raise RpcError(StatusCode.UNAVAILABLE, "circuit breaker is open") 42 43 if self.state == self.HALF_OPEN: 44 if self.half_open_calls >= self.half_open_max_calls: 45 raise RpcError(StatusCode.UNAVAILABLE, "circuit breaker is half-open") 46 self.half_open_calls += 1 47 48 def record_success(self) -> None: 49 self.failure_count = 0 50 self.half_open_calls = 0 51 self.state = self.CLOSED 52 53 def record_failure(self, error: RpcError) -> None: 54 if error.code not in {StatusCode.UNAVAILABLE, StatusCode.INTERNAL, StatusCode.UNKNOWN}: 55 return 56 if self.state == self.HALF_OPEN: 57 self.state = self.OPEN 58 self.opened_at = time.monotonic() 59 self.half_open_calls = 0 60 return 61 self.failure_count += 1 62 if self.failure_count >= self.failure_threshold: 63 self.state = self.OPEN 64 self.opened_at = time.monotonic()
Simple CLOSED → OPEN → HALF_OPEN circuit breaker.
14class AuditLogInterceptor(Interceptor): 15 """Captures structured audit events for requests, responses, and errors.""" 16 17 def __init__(self, *, sink: list[dict[str, Any]] | None = None, logger: logging.Logger | None = None) -> None: 18 self.sink = sink if sink is not None else [] 19 self.logger = logger or logging.getLogger(__name__) 20 21 def _record(self, event: str, ctx: CallContext, **extra: Any) -> None: 22 entry = { 23 "event": event, 24 "service": ctx.service, 25 "method": ctx.method, 26 "call_id": ctx.call_id, 27 "attempt": ctx.attempt, 28 "ts": time.time(), 29 **extra, 30 } 31 self.sink.append(entry) 32 self.logger.debug("audit=%s", entry) 33 34 async def on_request(self, ctx: CallContext, request: object) -> object: 35 self._record("request", ctx) 36 return request 37 38 async def on_response(self, ctx: CallContext, response: object) -> object: 39 self._record("response", ctx) 40 return response 41 42 async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None: 43 self._record("error", ctx, code=error.code.name, message=error.message) 44 return error
Captures structured audit events for requests, responses, and errors.
42class MetricsInterceptor(Interceptor): 43 """Collects RED metrics and creates OTel spans for each RPC call. 44 45 Metrics collected: 46 - ``aster.rpc.started`` -- counter, labels: service, method, pattern 47 - ``aster.rpc.completed`` -- counter, labels: service, method, status 48 - ``aster.rpc.duration`` -- histogram (seconds), labels: service, method 49 50 Tracing: 51 - One span per RPC call: ``{service}/{method}`` 52 - Span attributes: rpc.service, rpc.method, rpc.system, rpc.status_code 53 54 Falls back to simple in-memory counters when OTel is not installed. 55 """ 56 57 def __init__(self) -> None: 58 # In-memory fallback counters (always available) 59 self.started = 0 60 self.succeeded = 0 61 self.failed = 0 62 63 # Try to set up OTel 64 self._tracer: Any = None 65 self._meter: Any = None 66 self._started_counter: Any = None 67 self._completed_counter: Any = None 68 self._duration_histogram: Any = None 69 70 try: 71 from opentelemetry import trace, metrics # type: ignore 72 73 self._tracer = trace.get_tracer("aster.rpc", "0.2.0") 74 self._meter = metrics.get_meter("aster.rpc", "0.2.0") 75 76 self._started_counter = self._meter.create_counter( 77 "aster.rpc.started", 78 description="Total RPC calls started", 79 unit="1", 80 ) 81 self._completed_counter = self._meter.create_counter( 82 "aster.rpc.completed", 83 description="Total RPC calls completed", 84 unit="1", 85 ) 86 self._duration_histogram = self._meter.create_histogram( 87 "aster.rpc.duration", 88 description="RPC call duration", 89 unit="s", 90 ) 91 except Exception: 92 pass # OTel not installed -- use fallback counters 93 94 # Track start times per call_id for duration calculation 95 self._call_starts: dict[str, float] = {} 96 97 @property 98 def has_otel(self) -> bool: 99 """Whether OpenTelemetry is available and configured.""" 100 return self._tracer is not None 101 102 async def on_request(self, ctx: CallContext, request: object) -> object: 103 self.started += 1 104 105 labels = { 106 "service": ctx.service, 107 "method": ctx.method, 108 "pattern": ctx.pattern or "unary", 109 } 110 111 # OTel counter 112 if self._started_counter: 113 self._started_counter.add(1, labels) 114 115 # Start timing 116 call_key = f"{ctx.service}.{ctx.method}.{id(request)}" 117 self._call_starts[call_key] = time.monotonic() 118 119 # Start OTel span 120 if self._tracer: 121 from opentelemetry import trace # type: ignore 122 123 span = self._tracer.start_span( 124 f"{ctx.service}/{ctx.method}", 125 kind=trace.SpanKind.SERVER, 126 attributes={ 127 "rpc.system": "aster", 128 "rpc.service": ctx.service, 129 "rpc.method": ctx.method, 130 "rpc.aster.pattern": ctx.pattern or "unary", 131 "rpc.aster.idempotent": ctx.idempotent, 132 }, 133 ) 134 # Store span on context for on_response/on_error 135 ctx._otel_span = span # type: ignore[attr-defined] 136 ctx._otel_call_key = call_key # type: ignore[attr-defined] 137 138 # Set correlation ID for structured logging 139 from aster.logging import set_request_id 140 set_request_id(ctx.call_id or f"{ctx.service}.{ctx.method}") 141 142 return request 143 144 async def on_response(self, ctx: CallContext, response: object) -> object: 145 self.succeeded += 1 146 147 labels = {"service": ctx.service, "method": ctx.method, "status": "OK"} 148 149 if self._completed_counter: 150 self._completed_counter.add(1, labels) 151 152 # Record duration 153 call_key = getattr(ctx, "_otel_call_key", None) 154 if call_key and call_key in self._call_starts: 155 duration = time.monotonic() - self._call_starts.pop(call_key) 156 if self._duration_histogram: 157 self._duration_histogram.record( 158 duration, {"service": ctx.service, "method": ctx.method} 159 ) 160 161 # End OTel span 162 span = getattr(ctx, "_otel_span", None) 163 if span: 164 from opentelemetry.trace import StatusCode as OtelStatus # type: ignore 165 span.set_status(OtelStatus.OK) 166 span.end() 167 168 return response 169 170 async def on_error(self, ctx: CallContext, error: RpcError) -> RpcError | None: 171 self.failed += 1 172 173 labels = { 174 "service": ctx.service, 175 "method": ctx.method, 176 "status": error.code.name if hasattr(error.code, "name") else str(error.code), 177 } 178 179 if self._completed_counter: 180 self._completed_counter.add(1, labels) 181 182 # Record duration 183 call_key = getattr(ctx, "_otel_call_key", None) 184 if call_key and call_key in self._call_starts: 185 duration = time.monotonic() - self._call_starts.pop(call_key) 186 if self._duration_histogram: 187 self._duration_histogram.record( 188 duration, {"service": ctx.service, "method": ctx.method} 189 ) 190 191 # End OTel span with error 192 span = getattr(ctx, "_otel_span", None) 193 if span: 194 from opentelemetry.trace import StatusCode as OtelStatus # type: ignore 195 span.set_status(OtelStatus.ERROR, str(error.message)) 196 span.set_attribute("rpc.aster.error_code", str(error.code)) 197 span.record_exception(error) 198 span.end() 199 200 return error 201 202 def snapshot(self) -> dict[str, int]: 203 """Return a snapshot of in-memory counters.""" 204 return { 205 "started": self.started, 206 "succeeded": self.succeeded, 207 "failed": self.failed, 208 "in_flight": self.started - self.succeeded - self.failed, 209 }
Collects RED metrics and creates OTel spans for each RPC call.
Metrics collected:
aster.rpc.started-- counter, labels: service, method, patternaster.rpc.completed-- counter, labels: service, method, statusaster.rpc.duration-- histogram (seconds), labels: service, method
Tracing:
- One span per RPC call:
{service}/{method} - Span attributes: rpc.service, rpc.method, rpc.system, rpc.status_code
Falls back to simple in-memory counters when OTel is not installed.
97 @property 98 def has_otel(self) -> bool: 99 """Whether OpenTelemetry is available and configured.""" 100 return self._tracer is not None
Whether OpenTelemetry is available and configured.
202 def snapshot(self) -> dict[str, int]: 203 """Return a snapshot of in-memory counters.""" 204 return { 205 "started": self.started, 206 "succeeded": self.succeeded, 207 "failed": self.failed, 208 "in_flight": self.started - self.succeeded - self.failed, 209 }
Return a snapshot of in-memory counters.