Mastra A2A subagents#16348
Conversation
Add an experimental A2AAgent to @mastra/core/a2a with generate, resumeGenerate, stream, and resumeStream methods for remote A2A agents. Split the browser-safe shared A2A surface into @mastra/core/a2a/client so client-js can keep using shared protocol types and errors without pulling in the Node-only agent runtime. Also adds focused tests around card discovery, live streaming behavior, and resumable task flows.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis pull request introduces ChangesA2AAgent Remote Execution with Client Export
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🦋 Changeset detectedLatest commit: 2c268d4 The changes in this PR will be included in the next version bump. This PR includes changesets to release 23 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
.changeset/whole-papers-throw.md (1)
7-12: ⚡ Quick winRefocus "What changed" on developer outcomes rather than implementation details.
Lines 10-12 describe internal architecture ("in-memory caching", "typed result objects for wrappers", "dedicated subpath for browser-safe shared types") rather than what developers gain. Changeset descriptions should highlight capabilities and impact, not how features are implemented internally.
Consider rephrasing to emphasize outcomes:
- Line 10: What can developers do with card verification? (e.g., "Verify Agent Card signatures using custom key providers")
- Line 11: What do the result types enable? (Skip if this is purely internal)
- Line 12: What does the client subpath unlock? (e.g., "Import A2A types in browser environments via
@mastra/core/a2a/client")Also consider varying the sentence structure beyond "Added..." to improve scannability.
As per coding guidelines: "Highlight outcomes! What does change for the end user? Do not focus on internal implementation details."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In @.changeset/whole-papers-throw.md around lines 7 - 12, Update the "What changed" section to emphasize developer-facing outcomes instead of implementation details: for the A2AAgent entry mention the new developer capabilities (e.g., "Remote A2A execution via generate, resumeGenerate, stream, and resumeStream" so devs know what they can do), rephrase the Agent Card line to describe the outcome (e.g., "Verify Agent Card signatures with pluggable key providers" rather than "in-memory caching"), and rewrite the client subpath line to state the benefit (e.g., "Import A2A types in browser environments via `@mastra/core/a2a/client`"); vary sentence starts away from repeated "Added..." and prefer short outcome-focused bullets.packages/core/src/a2a/a2a-agent.ts (3)
1005-1013: 🏗️ Heavy lift
as unknown as A2AAgentStreamResultandAsyncIterable<any>defeat the public type contract.The two
as unknown as A2AAgentStreamResultcasts (lines 1002 and 1338) plusAsyncIterable<any>on line 1013 indicate the assembled object doesn't structurally matchA2AAgentStreamResult. Strict TypeScript will silently let you drift from the contract here — e.g., renaming a property onA2AAgentStreamResultwon't error in this file. Prefer defining a discriminated event union for stream chunks (text-start | text-delta | text-end | tool-call-suspended | finish) and constructing a value that satisfiesA2AAgentStreamResultwithoutunknowncasts. As per coding guidelines, "Packages must use strict TypeScript".Also applies to: 1002-1002, 1338-1338
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/a2a/a2a-agent.ts` around lines 1005 - 1013, The stream helper `#streamEvents` is currently typed as AsyncIterable<any> and uses two "as unknown as A2AAgentStreamResult" casts which bypass the A2AAgentStreamResult contract; replace the casts and the loose AsyncIterable<any> by defining a discriminated union type for stream events (e.g., "text-start" | "text-delta" | "text-end" | "tool-call-suspended" | "finish") and update `#streamEvents` to yield that union so each yielded object structurally matches A2AAgentStreamResult; remove the two casts (the ones wrapping assembled chunk objects) and construct objects that satisfy A2AAgentStreamResult directly, updating any helper/emit code in the same file to use the new discriminant and precise types.
917-1003: ⚖️ Poor tradeoffDuplicated SSE parsing across
#streamEventsand#collectStreamEvents.Both functions independently parse the same
tee()-d byte stream and reconstruct task/artifact/suspended state, with slightly different semantics (e.g.,#streamEventsalways emitscreateResumeSchema()fortool-call-suspendedon line 1132, while#collectStreamEventscarries the capturedresumeSchemafrom line 1198/1243; messages overwritetextBufferin the accumulator but append text-delta chunks on the consumer side). Consider parsing once into a shared internal event stream and deriving both the emitted chunks and the final aggregate from it. Not blocking, but it will drift if either path is touched.Also applies to: 1149-1276
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/a2a/a2a-agent.ts` around lines 917 - 1003, Both `#streamEvents` and `#collectStreamEvents` duplicate SSE parsing and reconstruct state differently, causing drift; extract the shared parsing logic into a single internal parser (e.g., new private method `#parseA2AStreamEvents`) that consumes the tee'd byte stream and yields canonical event objects (message-delta, task, artifact, suspended with resumeSchema). Have `#streamEvents` and `#collectStreamEvents` consume that canonical event async-iterator (or shared EventEmitter/Readable) to implement their distinct behaviors (streamed deltas vs. aggregated final state), remove duplicated parsing and the ad-hoc createResumeSchema() call so resumeSchema from the parsed suspended event is preserved, and update `#consumeA2AStream` to wire the parser output into both the consumerStream path and the accumulator path.
1354-1389: ⚡ Quick winRetry loop treats every error as transient, including non-retryable 4xx.
When
retries > 0, the catch block retries on any thrown error, including theMastraA2AError.invalidAgentResponse(...)raised at line 1369 for HTTP 4xx responses (400,401,403,404,422, ...). Those statuses won't succeed on retry, so this just amplifies bad requests and authn/authz failures while also delaying the failure surfaced to the caller. Consider classifying errors so only network failures,408,429, and5xxare retried.♻️ Sketch
} catch (error) { lastError = error; + const status = + error && typeof error === 'object' && 'status' in error && typeof (error as { status?: unknown }).status === 'number' + ? (error as { status: number }).status + : undefined; + const retryable = + status === undefined || status === 408 || status === 429 || status >= 500; + + if (!retryable) { + throw error; + } + if (attempts === this.#retries) { break; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/a2a/a2a-agent.ts` around lines 1354 - 1389, The retry loop in the A2A request method retries on every error (including MastraA2AError.invalidAgentResponse for 4xx), so change the catch in the method that calls this.#fetch (the retry loop around `#fetch` in a2a-agent.ts) to only retry transient errors: network/fetch failures (e.g., TypeError/AbortError), HTTP 408, 429, or any 5xx status. Specifically, detect when the caught error is a MastraA2AError (or an error object carrying a status) and if it has a numeric status that is not 408, 429, or >=500 rethrow immediately (do not increment attempts or delay); otherwise treat it as transient and continue with the existing attempts increment and `#delay` logic. Ensure you still assign lastError before rethrowing so the final throw at the end behaves the same for non-retriable failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/core/src/a2a/a2a-agent.test.ts`:
- Around line 572-576: The wall-clock assertion using startedAt/elapsedMs is
flaky; instead make the test assert that agent.stream(...) resolves before the
SSE completion signal by racing the stream call against a watchdog/deferred that
you resolve only after the mock SSE enqueues the final chunk. Replace the
elapsedMs expect with a Promise.race pattern: start the agent.stream('Live
stream please', { runId: 'stream-run-live' }) and race it against a deferred (or
Promise) that is resolved by the SSE mock at the moment the last chunk is
enqueued; then assert that the stream promise wins the race. Reference the
existing agent.stream call and the mock SSE completion event to implement the
deferred/watchdog.
In `@packages/core/src/a2a/a2a-agent.ts`:
- Around line 112-134: The parseEventBlock function currently calls
JSON.parse(payload) without protection, so a malformed SSE chunk will throw and
abort streaming; wrap the JSON.parse(payload) call in a try/catch inside
parseEventBlock (and keep the existing payload/to-result logic) so parsing
errors are caught, log or swallow the error, and return an empty result (e.g.,
{}) to signal "skip this frame" instead of throwing; this ensures callers like
collectStreamEvents/streamEvents continue processing subsequent valid events
while preserving handling for the '[DONE]' case and JSONRPCResponse result
extraction.
---
Nitpick comments:
In @.changeset/whole-papers-throw.md:
- Around line 7-12: Update the "What changed" section to emphasize
developer-facing outcomes instead of implementation details: for the A2AAgent
entry mention the new developer capabilities (e.g., "Remote A2A execution via
generate, resumeGenerate, stream, and resumeStream" so devs know what they can
do), rephrase the Agent Card line to describe the outcome (e.g., "Verify Agent
Card signatures with pluggable key providers" rather than "in-memory caching"),
and rewrite the client subpath line to state the benefit (e.g., "Import A2A
types in browser environments via `@mastra/core/a2a/client`"); vary sentence
starts away from repeated "Added..." and prefer short outcome-focused bullets.
In `@packages/core/src/a2a/a2a-agent.ts`:
- Around line 1005-1013: The stream helper `#streamEvents` is currently typed as
AsyncIterable<any> and uses two "as unknown as A2AAgentStreamResult" casts which
bypass the A2AAgentStreamResult contract; replace the casts and the loose
AsyncIterable<any> by defining a discriminated union type for stream events
(e.g., "text-start" | "text-delta" | "text-end" | "tool-call-suspended" |
"finish") and update `#streamEvents` to yield that union so each yielded object
structurally matches A2AAgentStreamResult; remove the two casts (the ones
wrapping assembled chunk objects) and construct objects that satisfy
A2AAgentStreamResult directly, updating any helper/emit code in the same file to
use the new discriminant and precise types.
- Around line 917-1003: Both `#streamEvents` and `#collectStreamEvents` duplicate
SSE parsing and reconstruct state differently, causing drift; extract the shared
parsing logic into a single internal parser (e.g., new private method
`#parseA2AStreamEvents`) that consumes the tee'd byte stream and yields canonical
event objects (message-delta, task, artifact, suspended with resumeSchema). Have
`#streamEvents` and `#collectStreamEvents` consume that canonical event
async-iterator (or shared EventEmitter/Readable) to implement their distinct
behaviors (streamed deltas vs. aggregated final state), remove duplicated
parsing and the ad-hoc createResumeSchema() call so resumeSchema from the parsed
suspended event is preserved, and update `#consumeA2AStream` to wire the parser
output into both the consumerStream path and the accumulator path.
- Around line 1354-1389: The retry loop in the A2A request method retries on
every error (including MastraA2AError.invalidAgentResponse for 4xx), so change
the catch in the method that calls this.#fetch (the retry loop around `#fetch` in
a2a-agent.ts) to only retry transient errors: network/fetch failures (e.g.,
TypeError/AbortError), HTTP 408, 429, or any 5xx status. Specifically, detect
when the caught error is a MastraA2AError (or an error object carrying a status)
and if it has a numeric status that is not 408, 429, or >=500 rethrow
immediately (do not increment attempts or delay); otherwise treat it as
transient and continue with the existing attempts increment and `#delay` logic.
Ensure you still assign lastError before rethrowing so the final throw at the
end behaves the same for non-retriable failures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 007957f3-1748-4cf9-a828-dac99cfa6e7a
📒 Files selected for processing (11)
.changeset/whole-papers-throw.mdclient-sdks/client-js/src/resources/a2a.test.tsclient-sdks/client-js/src/resources/a2a.tsclient-sdks/client-js/src/utils/process-a2a-stream.tspackages/core/package.jsonpackages/core/src/a2a/a2a-agent.test.tspackages/core/src/a2a/a2a-agent.tspackages/core/src/a2a/client.tspackages/core/src/a2a/index.tspackages/core/src/a2a/types.tspackages/core/tsup.config.ts
Keep subagent badge content expanded while live and hydrated subagent output is rendered. This avoids hiding nested tool and text streams before the parent response completes.
Wrap A2AAgent stream chunks with the same runId and from=AGENT envelope used by normal subagents. This lets parent agent streams and playground rendering treat remote A2A text deltas as live nested subagent output.
|
No dependency changes detected. Learn more about Socket for GitHub. 👍 No dependency changes detected in pull request |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/playground/src/lib/ai-ui/tools/badges/agent-badge.tsx (1)
34-34:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRemove unused
isCompleteprop from interface.The
isCompleteprop is defined inAgentBadgePropsbut is never destructured or referenced in the component implementation. Since line 92 now hardcodesinitialCollapsed={false}, this prop serves no purpose and should be removed to avoid confusion.🧹 Proposed fix
toolCalled?: boolean; - isComplete?: boolean; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/playground/src/lib/ai-ui/tools/badges/agent-badge.tsx` at line 34, AgentBadgeProps currently defines an unused isComplete property; remove isComplete from the AgentBadgeProps interface and any related type references so the prop is no longer declared, and ensure the AgentBadge component continues to use the explicit initialCollapsed={false} (no other changes needed to AgentBadge or its JSX); update any callers only if they were passing isComplete to avoid unused prop warnings.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@packages/playground/src/lib/ai-ui/tools/badges/agent-badge.tsx`:
- Line 34: AgentBadgeProps currently defines an unused isComplete property;
remove isComplete from the AgentBadgeProps interface and any related type
references so the prop is no longer declared, and ensure the AgentBadge
component continues to use the explicit initialCollapsed={false} (no other
changes needed to AgentBadge or its JSX); update any callers only if they were
passing isComplete to avoid unused prop warnings.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3204ef06-1135-4579-80f5-19978bc3e47d
📒 Files selected for processing (4)
packages/core/src/a2a/a2a-agent.test.tspackages/core/src/a2a/a2a-agent.tspackages/playground/src/lib/ai-ui/tools/__tests__/agent-badge-falsy-output-regression.test.tspackages/playground/src/lib/ai-ui/tools/badges/agent-badge.tsx
✅ Files skipped from review due to trivial changes (1)
- packages/playground/src/lib/ai-ui/tools/tests/agent-badge-falsy-output-regression.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/core/src/a2a/a2a-agent.ts
- packages/core/src/a2a/a2a-agent.test.ts
Update the A2AAgent changeset to describe remote A2A agents as Mastra subagents and show registration on a parent Agent.
This adds an experimental
A2AAgentto@mastra/core/a2aso a Mastra agent can use a remote A2A agent as a subagent.The main story here is registering a remote agent card URL on a parent agent and letting Mastra delegate to it through the normal subagent path.
A2AAgenthandles agent card discovery/bootstrap, remotegenerate/resumeGenerate, remotestream/resumeStream, suspend-resume payloads, optional agent card verification, and the browser-safe shared A2A imports used byclient-jsvia@mastra/core/a2a/client.I also tightened the stream path a bit while building this out: malformed SSE frames are skipped instead of aborting the run, non-transient 4xx responses are not retried, and the focused tests now cover the subagent path plus those stream edge cases.
ELI5
This PR lets one AI agent use another AI agent as a helper. Instead of doing all the work itself, an agent can send tasks to a remote AI agent and get back the results—kind of like asking a friend for help instead of doing something alone.
Changes Summary
This PR introduces an experimental A2AAgent implementation that enables Mastra agents to delegate work to remote A2A agents via a subagent pattern. The implementation includes comprehensive testing, stream robustness improvements, and a browser-safe client surface for shared types.
Core Implementation
@mastra/core/a2aimplementing theSubAgentinterfacegenerate(),resumeGenerate(),stream(), andresumeStream()methods for remote A2A execution over JSON-RPCStream Handling & Robustness
Browser-Safe Client Surface
@mastra/core/a2a/clientprovides browser-compatible types and error classesTesting & Validation
Build & Configuration
package.jsonexports map with new./a2a/clientsubpath (ESM and CJS targets with shared type definitions)src/a2a/client.tsto tsup build configurationType System Additions
New public types in
@mastra/core/a2a/types:A2AAgentOptions– configuration for agent card URL, verification, fetch, timeout, retry, and credential handlingA2AAgentVerificationOptions&A2AAgentCardVerificationContext– optional card verification hooksA2AAgentRunState– tracks run/task/context identifiers and card URLsA2AAgentResumePayload– suspend–resume state for multi-turn executionA2AAgentGenerateResult&A2AAgentStreamResult– typed results for generation and streamingClient-JS Updates
@mastra/core/a2a/clientinstead of@mastra/core/a2aUI Refinement
AgentBadgeno longer defaults badge to collapsed state; now consistently renders open for both live and hydrated subagent content