-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcli.py
More file actions
336 lines (267 loc) · 12.2 KB
/
cli.py
File metadata and controls
336 lines (267 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#!/usr/bin/env python3
"""CLI for managing LlamaIndex Workflow deployments on AWS Bedrock AgentCore.
Covers the full lifecycle — deploy, invoke, monitor, and teardown — plus
direct access to all AgentCore entrypoint actions (run, poll, events,
human-in-the-loop, cancel, list).
Usage:
python cli.py deploy --deployment-role <ARN> --execution-role <ARN>
python cli.py invoke --gov-id doc.pdf --utility-bill doc.pdf --bank-statement doc.pdf
python cli.py status [--handler-id ID]
python cli.py events [--handler-id ID]
python cli.py cancel [--handler-id ID]
python cli.py send-event --event '{"feedback": "approved"}'
python cli.py workflows
python cli.py handlers [--workflow NAME] [--status STATUS]
python cli.py destroy
"""
from __future__ import annotations
import argparse
import base64
import json
import logging
import uuid
from pathlib import Path
import boto3
import httpx
from llama_agents.agentcore.deploy import AgentCoreDeployer, DeployedRuntime
PROJECT_DIR = Path(__file__).parent
METADATA_FILE = PROJECT_DIR / ".agentcore" / "deployment.json"
def _save_metadata(runtime: DeployedRuntime) -> None:
"""Persist deployment metadata for later commands."""
METADATA_FILE.parent.mkdir(exist_ok=True)
METADATA_FILE.write_text(json.dumps(runtime.to_dict(), indent=2))
def _load_metadata() -> DeployedRuntime:
"""Load deployment metadata from a previous deploy."""
if not METADATA_FILE.exists():
raise FileNotFoundError("No deployment found. Run 'deploy' first.")
return DeployedRuntime.from_dict(json.loads(METADATA_FILE.read_text()))
def _get_deployer(profile: str | None, region: str | None = None) -> tuple[AgentCoreDeployer, DeployedRuntime]:
"""Build an AgentCoreDeployer + metadata for post-deploy commands."""
meta = _load_metadata()
deployer = AgentCoreDeployer(
session=boto3.Session(region_name=region or meta.region, profile_name=profile),
deployment_role="",
execution_role="",
)
return deployer, meta
def _invoke(args: argparse.Namespace, payload: dict) -> dict:
"""Send a payload to the deployed (or local) runtime and return the response."""
if args.local:
resp = httpx.post("http://localhost:8080/invocations", json=payload)
resp.raise_for_status()
return resp.json()
deployer, meta = _get_deployer(args.profile)
session_id = payload.get("handler_id") or str(uuid.uuid4())
return deployer.invoke(meta.arn, payload, session_id=session_id)
def _decode_result(data: dict) -> dict:
"""Deserialize the nested pydantic/envelope result using JsonSerializer."""
raw = data.get("result")
if not raw or not isinstance(raw, str):
return data
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError):
return data
from workflows.context.serializers import JsonSerializer
deserialized = JsonSerializer().deserialize_value(parsed)
# deserialized is an EventEnvelopeWithMetadata; extract the inner event value
if hasattr(deserialized, "value"):
result = deserialized.value
else:
result = deserialized
data = {**data, "result": result}
return data
def _print_json(data: dict) -> None:
print(json.dumps(data, indent=2))
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_deploy(args: argparse.Namespace) -> None:
"""Build, push, and deploy the workflow to AgentCore."""
session = boto3.Session(region_name=args.region, profile_name=args.profile)
deployer = AgentCoreDeployer(
session=session,
deployment_role=args.deployment_role,
execution_role=args.execution_role,
)
runtime = deployer.deploy(project_dir=PROJECT_DIR)
_save_metadata(runtime)
print(f"\nDeployed successfully!")
print(f" Runtime: {runtime.name}")
print(f" ARN: {runtime.arn}")
print(f"\nTest with:")
print(f" python cli.py invoke \\")
print(f" --gov-id sample_docs/drivers_license.pdf \\")
print(f" --utility-bill sample_docs/utility_bill.pdf \\")
print(f" --bank-statement sample_docs/bank_statement.pdf")
def _encode_doc(file_path: str, doc_type: str) -> dict:
"""Read a file from disk and return a KYCDocument-shaped dict."""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
return {
"file_b64": base64.b64encode(path.read_bytes()).decode(),
"file_name": path.name,
"doc_type": doc_type,
}
def cmd_invoke(args: argparse.Namespace) -> None:
"""Run the KYC workflow with document files."""
session_id = args.session_id or str(uuid.uuid4())
payload: dict = {
"action": "run" if args.wait else "run_nowait",
"handler_id": session_id,
"start_event": {
"documents": [
_encode_doc(args.gov_id, "government_id"),
_encode_doc(args.utility_bill, "utility_bill"),
_encode_doc(args.bank_statement, "bank_statement"),
]
},
}
if args.workflow:
payload["workflow"] = args.workflow
result = _invoke(args, payload)
_print_json(_decode_result(result))
print(f"\nSession ID: {session_id}")
print(f" Re-use with: --session-id {session_id}")
def cmd_status(args: argparse.Namespace) -> None:
"""Check the status and result of a workflow handler."""
if not args.handler_id:
raise SystemExit("Error: --handler-id is required for 'status'")
payload = {
"action": "get_result",
"handler_id": args.handler_id,
}
_print_json(_decode_result(_invoke(args, payload)))
def cmd_events(args: argparse.Namespace) -> None:
"""Retrieve recorded events for a workflow handler."""
if not args.handler_id:
raise SystemExit("Error: --handler-id is required for 'events'")
payload: dict = {
"action": "get_events",
"handler_id": args.handler_id,
}
if args.after_sequence is not None:
payload["after_sequence"] = args.after_sequence
if args.limit is not None:
payload["limit"] = args.limit
_print_json(_invoke(args, payload))
def cmd_send_event(args: argparse.Namespace) -> None:
"""Send an event into a running workflow (human-in-the-loop)."""
if not args.handler_id:
raise SystemExit("Error: --handler-id is required for 'send-event'")
event_data = json.loads(args.event)
payload: dict = {
"action": "send_event",
"handler_id": args.handler_id,
"event": event_data,
}
if args.step:
payload["step"] = args.step
_print_json(_invoke(args, payload))
def cmd_cancel(args: argparse.Namespace) -> None:
"""Cancel a running workflow handler."""
if not args.handler_id:
raise SystemExit("Error: --handler-id is required for 'cancel'")
payload: dict = {
"action": "cancel",
"handler_id": args.handler_id,
"purge": args.purge,
}
_print_json(_invoke(args, payload))
def cmd_workflows(args: argparse.Namespace) -> None:
"""List registered workflows."""
_print_json(_invoke(args, {"action": "list_workflows"}))
def cmd_handlers(args: argparse.Namespace) -> None:
"""List workflow handlers, optionally filtered."""
payload: dict = {"action": "list_handlers"}
if args.workflow:
payload["workflow"] = args.workflow
if args.status:
payload["status"] = args.status
_print_json(_invoke(args, payload))
def cmd_destroy(args: argparse.Namespace) -> None:
"""Tear down the deployment and clean up AWS resources."""
deployer, meta = _get_deployer(args.profile)
deployer.destroy_from_metadata(meta)
print("Cleanup complete.")
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def _add_common_args(parser: argparse.ArgumentParser) -> None:
"""Add --local and --profile flags shared across invoke-style commands."""
parser.add_argument("--local", action="store_true", help="Target localhost:8080 instead of deployed runtime")
parser.add_argument("--profile", default=None, help="AWS CLI profile name")
def main() -> None:
"""CLI entry point."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description="Manage LlamaIndex Workflow deployments on AWS Bedrock AgentCore"
)
sub = parser.add_subparsers(dest="command", required=True)
# -- deploy --
deploy_p = sub.add_parser("deploy", help="Build, push, and deploy to AgentCore")
deploy_p.add_argument("--deployment-role", required=True, help="IAM role ARN for CodeBuild")
deploy_p.add_argument("--execution-role", required=True, help="IAM role ARN for AgentCore Runtime")
deploy_p.add_argument("--region", default="us-east-1", help="AWS region")
deploy_p.add_argument("--profile", default=None, help="AWS CLI profile name")
# -- invoke --
invoke_p = sub.add_parser("invoke", help="Run the KYC workflow with document files")
invoke_p.add_argument("--gov-id", required=True, help="Path to Government ID PDF")
invoke_p.add_argument("--utility-bill", required=True, help="Path to Utility Bill PDF")
invoke_p.add_argument("--bank-statement", required=True, help="Path to Bank Statement PDF")
invoke_p.add_argument("--workflow", help="Specific workflow name (default: auto-detected)")
invoke_p.add_argument("--session-id", default=None, help="Session ID (reuse to get cached results)")
invoke_p.add_argument("--no-wait", dest="wait", action="store_false", default=True,
help="Return immediately without waiting for completion")
_add_common_args(invoke_p)
# -- status --
status_p = sub.add_parser("status", help="Check handler status and result")
status_p.add_argument("--handler-id", required=True, help="Handler/session ID to check")
_add_common_args(status_p)
# -- events --
events_p = sub.add_parser("events", help="Retrieve recorded workflow events")
events_p.add_argument("--handler-id", required=True, help="Handler/session ID")
events_p.add_argument("--after-sequence", type=int, default=None, help="Only events after this sequence number")
events_p.add_argument("--limit", type=int, default=None, help="Max events to return")
_add_common_args(events_p)
# -- send-event --
send_event_p = sub.add_parser("send-event", help="Send event into a running workflow (human-in-the-loop)")
send_event_p.add_argument("--handler-id", required=True, help="Handler/session ID")
send_event_p.add_argument("--event", required=True, help="JSON event payload")
send_event_p.add_argument("--step", default=None, help="Target step name")
_add_common_args(send_event_p)
# -- cancel --
cancel_p = sub.add_parser("cancel", help="Cancel a running workflow handler")
cancel_p.add_argument("--handler-id", required=True, help="Handler/session ID to cancel")
cancel_p.add_argument("--purge", action="store_true", help="Also purge handler data")
_add_common_args(cancel_p)
# -- workflows --
workflows_p = sub.add_parser("workflows", help="List registered workflows")
_add_common_args(workflows_p)
# -- handlers --
handlers_p = sub.add_parser("handlers", help="List workflow handlers")
handlers_p.add_argument("--workflow", default=None, help="Filter by workflow name")
handlers_p.add_argument("--status", default=None, help="Filter by status")
_add_common_args(handlers_p)
# -- destroy --
destroy_p = sub.add_parser("destroy", help="Tear down deployment and clean up")
destroy_p.add_argument("--profile", default=None, help="AWS CLI profile name")
args = parser.parse_args()
commands = {
"deploy": cmd_deploy,
"invoke": cmd_invoke,
"status": cmd_status,
"events": cmd_events,
"send-event": cmd_send_event,
"cancel": cmd_cancel,
"workflows": cmd_workflows,
"handlers": cmd_handlers,
"destroy": cmd_destroy,
}
commands[args.command](args)
if __name__ == "__main__":
main()