-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
111 lines (92 loc) · 3.9 KB
/
Copy pathmain.py
File metadata and controls
111 lines (92 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
import os
import uvicorn
from core.audio.pipeline import AudioPipeline
from core.memory.db import ContextDB
from core.memory.slm import SLMEngine
from core.memory.router import IntentRouter
from core.agent_builder import AgentBuilder
from dashboard.app import app, broadcast_state
from execution.sandbox import SandboxRunner
from execution.hitl import HITLEngine
class CoreEcho:
def __init__(self):
self.audio = AudioPipeline()
self.db = ContextDB()
self.slm = SLMEngine()
self.router = IntentRouter()
self.agent_builder = AgentBuilder()
self.sandbox = SandboxRunner()
self.hitl = HITLEngine(broadcast_callback=broadcast_state)
async def handle_audio_text(self, text):
print(f"Heard: {text}")
await broadcast_state("listening", {})
# 1. Store in DB
self.db.add_memory("user", text)
context = self.db.get_markdown_context()
# 2. Parse Intent
parsed_intent = self.slm.parse_intent(text, context, self.router.schemas)
if not parsed_intent:
await broadcast_state("idle", {})
return
# 3. Dynamic Agent Creation
if parsed_intent.get("intent") == "create_agent":
query = parsed_intent.get("parameters", {}).get("query", text)
print(f"No existing agent found for: '{query}'. Creating a new agent dynamically...")
await broadcast_state("datacard", {
"title": "Building Agent",
"data": {"query": query, "status": "Generating code..."}
})
# Generate the new agent
new_agent_name, new_agent_manifest = self.agent_builder.build_agent_from_query(query)
# Reload router schemas
self.router.load_schemas()
print(f"Agent '{new_agent_name}' created successfully.")
# Use the newly created agent directly
parsed_intent = {
"intent": new_agent_name,
"parameters": {"input_data": text}
}
# 4. Route Intent
schema = self.router.route(parsed_intent)
if not schema:
print("No matching schema found.")
await broadcast_state("idle", {})
return
# 4. Check HITL
if schema.get("destructive", False):
confirmed = await self.hitl.require_confirmation({
"description": schema["description"],
"parameters": parsed_intent.get("parameters")
})
if not confirmed:
print("HITL aborted.")
await broadcast_state("idle", {})
return
# 6. Execute
await broadcast_state("datacard", {
"title": f"Executing: {schema['name']}",
"data": parsed_intent.get("parameters")
})
# Find plugin.py path dynamically based on schema
plugin_dir = schema.get("plugin_dir", os.path.join("plugins", "base_plugin"))
plugin_path = os.path.join(plugin_dir, "plugin.py")
result = self.sandbox.execute_plugin(plugin_path, parsed_intent.get("parameters"))
self.db.add_memory("system", str(result))
await asyncio.sleep(2) # Keep datashow up briefly
await broadcast_state("idle", {})
async def run(self):
# Start Dashboard Server
server_config = uvicorn.Config(app, host="127.0.0.1", port=8000, log_level="info")
server = uvicorn.Server(server_config)
# Run everything concurrently
await asyncio.gather(
server.serve(),
self.audio.run(self.handle_audio_text)
)
if __name__ == "__main__":
echo = CoreEcho()
try:
asyncio.run(echo.run())
except KeyboardInterrupt:
print("Shutting down...")