Skip to content

Commit 647019e

Browse files
committed
Deslop functional.py fix commit
- Remove dead instrumentation added in the prior commit that was never consumed: `RunContext._observed_step_names`, `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`, and `FunctionalWorkflowAgent._extra_kwargs`. The signature hash relies on `co_code` alone, which covers the attribute-access case without the collection-scaffolding. - Trim over-explanatory comments that restated what the code does or what it no longer does. Keep only the comments that answer "why" for the non-obvious bits (deterministic id contract, defensive deepcopy, stale replay guard). - Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__` block docstrings without losing the user-facing reasoning. Net -49 lines. Regression lock preserved (766 passed, 1 skipped, 2 xfailed).
1 parent 02f2cc3 commit 647019e

1 file changed

Lines changed: 26 additions & 75 deletions

File tree

python/packages/core/agent_framework/_workflows/_functional.py

Lines changed: 26 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,6 @@ def __init__(
181181

182182
# User state (simple dict)
183183
self._state: dict[str, Any] = {}
184-
# Step names actually invoked during this run (for signature hashing)
185-
self._observed_step_names: set[str] = set()
186184

187185
# Callback invoked after each step completes (set by FunctionalWorkflow)
188186
self._on_step_completed: Callable[[], Awaitable[None]] | None = None
@@ -228,21 +226,14 @@ async def request_info(
228226
(not visible to workflow authors).
229227
"""
230228
if request_id is None:
231-
# Deterministic auto-ID: relies on the workflow function being
232-
# deterministic w.r.t. call order (same contract as @step caching).
233-
# Using a fresh uuid here would break HITL resume because the
234-
# replayed call would generate a new id and never find the
235-
# caller-supplied response.
229+
# Deterministic id; same determinism contract as @step caching.
236230
rid = f"auto::{self._auto_request_info_index}"
237231
self._auto_request_info_index += 1
238232
else:
239233
rid = request_id
240234

241-
# Check if we already have a response for this request
242235
found, value = self._get_response(rid)
243236
if found:
244-
# Response already satisfied on a prior round — do not re-emit as
245-
# a pending request on the current checkpoint.
246237
self._pending_requests.pop(rid, None)
247238
return value
248239

@@ -324,15 +315,6 @@ def is_streaming(self) -> bool:
324315
def _get_events(self) -> list[WorkflowEvent[Any]]:
325316
return list(self._events)
326317

327-
def _record_observed_step(self, name: str) -> None:
328-
"""Record a step name actually invoked during this run.
329-
330-
Used by :meth:`FunctionalWorkflow._compute_signature_hash` to produce a
331-
signature based on steps the workflow actually executed, instead of a
332-
static bytecode scan that misses attribute-access patterns.
333-
"""
334-
self._observed_step_names.add(name)
335-
336318
def _get_step_cache_key(self, step_name: str) -> tuple[str, int]:
337319
idx = self._step_call_counters.get(step_name, 0)
338320
self._step_call_counters[step_name] = idx + 1
@@ -465,14 +447,10 @@ async def __call__(self, *args: Any, **kwargs: Any) -> R:
465447
return await self._func(*args, **kwargs)
466448

467449
cache_key = ctx._get_step_cache_key(self.name)
468-
ctx._record_observed_step(self.name)
469450
found, cached = ctx._get_cached_result(cache_key)
470451
if found:
471-
# Replay path: emit the dedicated ``executor_bypassed`` event type
472-
# (distinct from executor_invoked/completed/failed) so consumers
473-
# can unambiguously identify cache-hit replays. deepcopy is
474-
# deferred to the live branch below so replays work even when
475-
# arguments are not deepcopyable (e.g. open sessions, locks).
452+
# Dedicated bypass event so consumers can tell cache-hit replays
453+
# apart from fresh executions.
476454
await ctx.add_event(WorkflowEvent.executor_bypassed(self.name, cached))
477455
return cached # type: ignore[return-value, no-any-return]
478456

@@ -481,9 +459,8 @@ async def __call__(self, *args: Any, **kwargs: Any) -> R:
481459
if self._ctx_param_name is not None and self._ctx_param_name not in call_kwargs:
482460
call_kwargs[self._ctx_param_name] = ctx
483461

484-
# Live execution path — defensive deepcopy of args for the event log
485-
# only. If deepcopy fails (non-deepcopyable args), fall back to the
486-
# original mapping; the events are diagnostic, not authoritative.
462+
# Defensive deepcopy for the event log only; fall back to the live
463+
# reference so non-deepcopyable args (locks, sockets) don't fail.
487464
if args or kwargs:
488465
try:
489466
invocation_data: Any = deepcopy({"args": args, "kwargs": kwargs})
@@ -645,26 +622,17 @@ def __init__(
645622
self.description = description
646623
self._checkpoint_storage = checkpoint_storage
647624
self._is_running = False
648-
# Last message used to invoke the workflow (for replay on resume).
649-
# Cleared on clean completion (no pending requests) so a later
650-
# response-only call can't silently replay with a stale message.
625+
# Replay state: cleared on clean completion so later responses-only
626+
# calls can't silently replay with stale data from a prior run.
651627
self._last_message: Any = None
652-
# Step cache from the last run (for response-only replay without checkpoint)
653628
self._last_step_cache: dict[tuple[str, int], Any] = {}
654-
# IDs of the requests a caller is expected to answer on the next run.
655-
# Empty after clean completion; populated when the workflow interrupts
656-
# with pending request_info events.
657629
self._last_pending_request_ids: set[str] = set()
658630

659-
# Validate the signature once, at decoration time, instead of failing
660-
# with a confusing TypeError on the first call.
631+
# Signature arity is validated once at decoration time.
661632
self._non_ctx_param_names = self._classify_signature(func)
662633

663634
# Discover step names referenced in the function for signature hash
664635
self._step_names = self._discover_step_names(func)
665-
# Additional step names accumulated from observed executions (covers
666-
# attribute-access patterns the static discovery misses).
667-
self._runtime_step_names: set[str] = set()
668636

669637
# Compute a stable signature hash
670638
self.graph_signature_hash = self._compute_signature_hash()
@@ -784,12 +752,10 @@ def run(
784752
"""
785753
self._validate_run_params(message, responses, checkpoint_id)
786754
if responses and checkpoint_id is None:
787-
# Response-only resume: require at least one key to match an
788-
# actually-pending request on this instance. Prevents silent
789-
# replay against stale singleton state after a clean completion.
790-
# Callers whose workflows interrupt multiple times in sequence
791-
# legitimately accumulate all prior responses plus the latest one
792-
# — accept the call as long as any key matches the current set.
755+
# Require at least one response key to match a currently-pending
756+
# request; prevents silent replay against stale state while still
757+
# allowing callers to accumulate prior answers across multi-round
758+
# HITL.
793759
if not self._last_pending_request_ids:
794760
raise ValueError(
795761
f"responses={list(responses)!r} do not correspond to any pending request on "
@@ -956,13 +922,6 @@ async def _on_step_completed() -> None:
956922
if return_value is not None:
957923
await ctx.add_event(WorkflowEvent.output(self.name, return_value))
958924

959-
# Record observed step names as a diagnostic set (not mixed
960-
# into graph_signature_hash — that would mutate the hash
961-
# across runs and break per-step checkpoint restore). The
962-
# signature already includes co_code, which catches the
963-
# attribute-access case the static scan missed.
964-
self._runtime_step_names |= ctx._observed_step_names
965-
966925
# Persist step cache for response-only replay
967926
self._last_step_cache = dict(ctx._step_cache)
968927

@@ -990,9 +949,7 @@ async def _on_step_completed() -> None:
990949
with _framework_event_origin():
991950
yield WorkflowEvent.status(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
992951
else:
993-
# Clean completion: clear the cross-run replay state so a
994-
# later `run(responses=...)` can't silently replay against
995-
# stale singleton state from this run.
952+
# Clean completion — drop cross-run replay state.
996953
self._last_message = None
997954
self._last_step_cache = {}
998955
self._last_pending_request_ids = set()
@@ -1005,7 +962,6 @@ async def _on_step_completed() -> None:
1005962
# Persist step cache for response-only replay
1006963
self._last_step_cache = dict(ctx._step_cache)
1007964
self._last_pending_request_ids = set(ctx._pending_requests)
1008-
self._runtime_step_names |= ctx._observed_step_names
1009965

1010966
# HITL interruption — yield events collected so far
1011967
for event in ctx._get_events():
@@ -1118,17 +1074,12 @@ async def _save_checkpoint(
11181074
return await storage.save(checkpoint)
11191075

11201076
def _compute_signature_hash(self) -> str:
1121-
"""Compute a stable hash that identifies this workflow's code shape.
1122-
1123-
The hash mixes:
1124-
* the workflow name,
1125-
* statically-discovered step names (from ``co_names`` scan),
1126-
* a digest of the function's bytecode and ``co_names``, so changes to
1127-
the function body invalidate old checkpoints even when the user
1128-
accesses steps via module / class attributes (``my_steps.fetch`` /
1129-
``Steps.fetch``) that the static scan misses. The code digest is
1130-
the load-bearing part of this hash; ``steps`` is kept for
1131-
human-readable diffing only.
1077+
"""Stable hash of the workflow's code shape.
1078+
1079+
Mixes workflow name, statically-discovered step names, and a digest
1080+
of ``__code__.co_code`` + ``co_names``. The code digest catches
1081+
body changes that step-name discovery misses (e.g. attribute-access
1082+
step references).
11321083
"""
11331084
code = getattr(self._func, "__code__", None)
11341085
co_code_hex = hashlib.sha256(code.co_code).hexdigest() if code is not None else ""
@@ -1149,10 +1100,10 @@ def _discover_step_names(func: Callable[..., Any]) -> list[str]:
11491100
"""Extract step names referenced by the workflow function.
11501101
11511102
Inspects the function's ``__code__.co_names`` and global scope for
1152-
``StepWrapper`` instances. Steps accessed through module or class
1153-
attributes (e.g. ``my_steps.fetch``, ``Steps.fetch``) are picked up
1154-
later via :meth:`RunContext._record_observed_step` as they execute;
1155-
the signature hash mixes both sources.
1103+
``StepWrapper`` instances. Steps accessed via module or class
1104+
attributes (``my_steps.fetch``) are missed here, but
1105+
:meth:`_compute_signature_hash` still captures them through the
1106+
``co_code`` digest.
11561107
"""
11571108
names: list[str] = []
11581109
globs = getattr(func, "__globals__", {})
@@ -1335,15 +1286,15 @@ def __init__(
13351286
context_providers: Sequence[Any] | None = None,
13361287
**kwargs: Any,
13371288
) -> None:
1289+
# kwargs is accepted for signature parity with graph Workflow.as_agent
1290+
# but not otherwise consumed.
1291+
del kwargs
13381292
self._workflow = workflow
13391293
self.name = name or workflow.name
13401294
self.id = f"FunctionalWorkflowAgent_{self.name}"
13411295
self.description: str | None = description if description is not None else workflow.description
13421296
self.context_providers: Sequence[Any] | None = context_providers
13431297
self._pending_requests: dict[str, WorkflowEvent[Any]] = {}
1344-
# Keep extra kwargs around for forward compatibility but don't act on
1345-
# them — signature parity with graph Workflow.as_agent is the goal.
1346-
self._extra_kwargs: dict[str, Any] = dict(kwargs)
13471298

13481299
@property
13491300
def pending_requests(self) -> dict[str, WorkflowEvent[Any]]:

0 commit comments

Comments
 (0)