Tölvera LLM Agent: 6-Week Implementation Plan
Current State Analysis
Already Implemented:
- Products of Programmtic Experts system for behavior synthesis
- Species interactions via
species_manager.py - Behavior decomposition for complex descriptions via
behavior_manager.py - Error detection and correction pipeline
- Kernel accumulation and learning system
Additions Needed:
- Temporal state management and multi-timescale dynamics
- Cognitive capacities (memory, learning, anticipation)
- Spatial field systems (territories, resources)
- OSC mapping and sonification framework
- Evolution and adaptation mechanisms
- Textualize-based CLI interface (started, but left to finish up)
Week 1 (July 14-20): Temporal & State Integration
Goal
Extend the State system to support temporal dynamics and implement time-based behaviors.
Implementation Tasks
1.1 Temporal State Extension
I’m thinking of having a manager much like the species or behavior that takes over the main control of this portion of this aspect of generation.
# In src/tolvera/llm/temporal_manager.py
class TemporalStateManager:
def __init__(self, tv):
self.tv = tv
self.setup_temporal_states()
def setup_temporal_states(self):
# Core temporal state for all particles
self.tv.s.temporal = {
"state": {
"phase": (ti.f32, 0.0, 2*3.14159),
"frequency": (ti.f32, 0.1, 10.0),
"energy": (ti.f32, 0.0, 1.0),
"age": (ti.i32, 0, 10000),
"last_pos": (ti.math.vec2, 0.0, 1.0),
"activity": (ti.f32, 0.0, 1.0),
},
"shape": self.tv.pn,
"osc": ("get", "set", "stream"),
"randomise": True
}
# Global temporal state
self.tv.s.global_time = {
"state": {
"day_phase": (ti.f32, 0.0, 1.0), # 0=night, 0.5=noon, 1=night
"season": (ti.i32, 0, 3), # 0=spring, 1=summer, etc.
"global_energy": (ti.f32, 0.0, 1.0), # Total system energy
},
"shape": 1,
"osc": ("get", "stream"),
"randomise": False
}1.2 Temporal Behavior Experts
After this temporal implementation manager, we should be able to have particles follow these types of temporal dynamics.
temporal_behaviors = [
"particles oscillate with individual frequencies (stored in temporal.frequency state)",
"particles with low energy move slower (by multiplying velocity by energy level)",
"particles increase activity during day_phase (0.25 to 0.75 (daytime))",
"particles synchronize phases with nearby particles (strength 0.5)",
"particles lose energy over time (at rate 0.001 per frame)",
"particles remember last position and avoid returning to it"
]1.3 Multi-Timescale Update Kernel
Update different temporal components at appropriate frequencies
Not the actual code, but we could implement something like this:
@ti.kernel
def multi_timescale_update():
frame_count = ti.cast(tv.s.global_time.field[0].age, ti.i32)
# Every frame: age, position memory
for i in range(tv.pn):
tv.s.temporal.field[i].age += 1
tv.s.temporal.field[i].last_pos = tv.p.field[i].pos
# Every 10 frames: energy updates
if frame_count % 10 == 0:
for i in range(tv.pn):
# Energy decay
tv.s.temporal.field[i].energy *= 0.99
# Every 100 frames: phase updates, global time
if frame_count % 100 == 0:
tv.s.global_time.field[0].day_phase = (tv.s.global_time.field[0].day_phase + 0.01) % 1.01.4 Integration with Current System
Modify expert synthesis to access temporal states:
# Update prompt templates to include temporal state access
temporal_prompt_addition = """
You can access temporal states via:
- tv.s.temporal.field[i].phase - oscillation phase
- tv.s.temporal.field[i].energy - current energy level
- tv.s.temporal.field[i].age - particle age
- tv.s.global_time.field[0].day_phase - global day/night cycle
# More here if we need this...
"""Week 1 Deliverables
- Working temporal state system with proper Taichi integration
- Multi-timescale update mechanism
- Integration with existing current synthesis pipeline
- Example: Day/night cycle affecting particle behavior
Week 2 (July 21-27): Cognitive Capacities Implementation
Goal
Implement cognitive capacities (orienting response, sensing, memory, learning, etc.)
Implementation Tasks
2.1 Cognitive State System
# In src/tolvera/llm/cognitive_manager.py ???
class CognitiveStateManager:
def __init__(self, tv):
self.tv = tv
self.setup_cognitive_states()
def setup_cognitive_states(self):
# Individual cognitive states
self.tv.s.cognitive = {
"state": {
# Sensing & Perception
"attention_focus": (ti.math.vec2, -1.0, 1.0), # Current attention vector
"perceived_threat": (ti.f32, 0.0, 1.0), # Threat level
"perceived_opportunity": (ti.f32, 0.0, 1.0), # Opportunity level
# Memory (short-term)
"memory_pos": (ti.math.vec3, 0.0, 1.0), # [x, y, time_ago]
"memory_valence": (ti.f32, -1.0, 1.0), # Good/bad memory
# Learning & Adaptation
"success_rate": (ti.f32, 0.0, 1.0), # Learning metric
"behavior_threshold": (ti.f32, 0.0, 1.0), # Adaptive threshold
# Decision Making
"current_goal": (ti.i32, 0, 5), # Current behavioral goal
"motivation": (ti.f32, 0.0, 1.0), # Motivation level
},
"shape": self.tv.pn,
"osc": ("get", "set"),
"randomise": True
}2.2 Cognitive Behavior Experts
These are taken from the basal cognitive capcities and applied to what our system allows us to do.
cognitive_behaviors = [
# Orienting Response
"particles orient towards sudden changes in nearby particle density",
# Sensing/Perception
"particles sense threats when species 1 approaches (update perceived_threat)",
# Discrimination
"particles distinguish between food (green areas) and danger (red areas)",
# Memory
"particles remember location of last food source for 1000 frames",
# Valence
"particles assign positive valence to areas with high energy gain",
# Decision Making
"particles choose between explore, forage, or flee based on current state",
# Problem Solving
"particles find alternate paths when direct route is blocked",
# Error Detection
"particles detect and correct course when moving away from goal",
# Learning
"particles adjust behavior_threshold based on success_rate",
# Anticipation
"particles predict future positions of moving targets and intercept",
# Communication
"particles signal others when food is found"
]2.3 Attention Mechanism
We need a way to calculate where particles should focus attention based on stimuli.
@ti.func
def calculate_attention_focus(i: ti.i32) -> ti.math.vec2:
pos_i = tv.p.field[i].pos
max_salience = 0.0
focus_dir = ti.math.vec2(0.0, 0.0)
# Check all nearby particles for features
for j in range(tv.pn):
if i != j:
pos_j = tv.p.field[j].pos
dist = (pos_j - pos_i).norm()
if dist < 100.0: # Attention range (distance)
# Salience based on movement, species, etc.
vel_j = tv.p.field[j].vel
salience = vel_j.norm() * 0.5
if tv.p.field[j].species == 1: # Predator species
salience *= 2.0 # Predators are very salient
if salience > max_salience:
max_salience = salience
focus_dir = (pos_j - pos_i).normalized()
return focus_dir2.4 Learning System
I don’t think this is quite how this would be implemented but something along these lines to get the species to learn from it’s own behaviors and adjust accordingly.
# Expert for adaptive behavior
adaptive_expert = """
particles learn from success:
- if current behavior leads to energy gain, decrease behavior_threshold
- if current behavior leads to energy loss, increase behavior_threshold
- use behavior_threshold to modulate response strength
"""Week 2 Deliverables
- Cognitive state system implementation
- Attention focus mechanism
- Simple learning/adaptation system
- Memory-based navigation
- Example: Particles learning to avoid predators without explicit instruction on avoiding that species
Week 3 (July 28-Aug 3): Advanced Spatial Dynamics & Environmental State
Goal
Implement spatial field systems for territories, resources, and environmental influences.
Implementation Tasks
3.1 Spatial Field States
# In src/tolvera/llm/spatial_manager.py
class SpatialFieldManager:
def __init__(self, tv, grid_resolution=64):
self.tv = tv
self.grid_res = grid_resolution
self.setup_spatial_fields()
def setup_spatial_fields(self):
self.tv.s.pheromones = {
"state": {
"food_trail": (ti.f32, 0.0, 1.0), # Food pheromone
"danger_signal": (ti.f32, 0.0, 1.0), # Danger pheromone
"territory_mark": (ti.f32, 0.0, 1.0), # Territory marker
"mating_signal": (ti.f32, 0.0, 1.0), # Mating pheromone
"decay_rate": (ti.f32, 0.99, 0.999), # Per-cell decay
},
"shape": (self.grid_res, self.grid_res),
"osc": ("get", "stream"),
"randomise": False
}
# Resource distribution (I haven't thought through this one yet)
self.tv.s.resources = {
"state": {
"food_amount": (ti.f32, 0.0, 1.0),
"food_quality": (ti.f32, 0.0, 1.0),
"regeneration_rate": (ti.f32, 0.0, 0.01),
"resource_type": (ti.i32, 0, 3),
},
"shape": (self.grid_res // 2, self.grid_res // 2),
"osc": ("get", "set"),
"randomise": True
}
# Environmental forces (not sure if this is relevant???)
self.tv.s.environment = {
"state": {
"flow_field": (ti.math.vec2, -1.0, 1.0),
"temperature": (ti.f32, 0.0, 1.0),
"obstacles": (ti.f32, 0.0, 1.0),
},
"shape": (self.grid_res // 4, self.grid_res // 4),
"osc": ("get", "set"),
"randomise": False
}3.2 Pheromone System Experts
Pheromone diffusion kernel
I haven’t tested any of this and really don’t know if this will work, but this would be really neat if possible :)
@ti.kernel
def diffuse_pheromones():
for i, j in ti.ndrange(grid_res, grid_res):
# Simple diffusion with 4-neighbors
for pheromone_type in range(4):
center = tv.s.pheromones.field[i, j][pheromone_type]
neighbors_sum = 0.0
count = 0
for di, dj in [(-1,0), (1,0), (0,-1), (0,1)]:
ni, nj = i + di, j + dj
if 0 <= ni < grid_res and 0 <= nj < grid_res:
neighbors_sum += tv.s.pheromones.field[ni, nj][pheromone_type]
count += 1
# Diffusion + decay
if count > 0:
new_val = center * 0.9 + neighbors_sum * 0.025
tv.s.pheromones.field[i, j][pheromone_type] = new_val * decay_rate3.3 Resource Consumption System
resource_behaviors = [
"particles consume food_amount from resources (when energy < 0.5)",
"high quality resources provide more energy gain per consumption",
"resources regenerate slowly based on regeneration_rate",
"species have preferences for different resource_type values",
"overconsumption reduces regeneration_rate temporarily"
]3.4 Environmental Influence
environmental_behaviors = [
"particles are pushed by flow_field vectors in environment",
"particle speed affected by local temperature (slower when cold)",
"particles avoid cells where obstacles > 0.8",
"particles path-find around obstacle regions"
]Week 3 Deliverables
- Spatial field state system (resources, environment)
- Resource consumption and regeneration
- Environmental forces and obstacles
Week 4 (Aug 4-10): OSC & Sonification Framework
Goal
Build in some OSC mapping system for real-time sonification of emergent behaviors (use the existing OSC mapping, but allow the LLM to generate these mapping behaviors too)
Implementation Tasks
4.1 OSC Mapping Architecture
# In src/tolvera/llm/sonification_manager.py
class SonificationManager:
def __init__(self, tv, osc_client):
self.tv = tv
self.osc = osc_client
self.mappings = {}
self.analysis_results = {}
self.smoothing_values = {}
def add_mapping(self, source_param, osc_address,
transform=None, smoothing=0.0,
threshold=None, scale=(0.0, 1.0)):
self.mappings[source_param] = {
'address': osc_address, # This will get confusing really quick, but we'll see what the errors are when we get here.
'transform': transform or (lambda x: x),
'smoothing': smoothing,
'threshold': threshold,
'scale': scale,
'last_value': 0.0
}4.2 Real-time Analysis Kernels
@ti.kernel
def analyze_emergence_metrics():
# Flocking metrics
alignment_sum = 0.0
cohesion_sum = 0.0
separation_avg = 0.0
# Species interaction metrics
species_distances = ti.Matrix([[0.0 for _ in range(MAX_SPECIES)]
for _ in range(MAX_SPECIES)])
# Spatial metrics
total_density = 0.0
density_variance = 0.0
# Pheromone metrics
total_pheromone = 0.0
pheromone_clusters = 0
# Movement metrics
avg_velocity = 0.0
velocity_variance = 0.0
angular_momentum = 0.0
# Could be slow when calculating this so I'm not sure on this one
for i in range(tv.pn):
if tv.p.field[i].active > 0:
pos_i = tv.p.field[i].pos
vel_i = tv.p.field[i].vel
species_i = tv.p.field[i].species
# Local neighborhood analysis
local_alignment = ti.Vector([0.0, 0.0])
local_center = ti.Vector([0.0, 0.0])
neighbor_count = 0
for j in range(tv.pn):
if i != j and tv.p.field[j].active > 0:
pos_j = tv.p.field[j].pos
dist = (pos_j - pos_i).norm()
if dist < 50.0:
local_alignment += tv.p.field[j].vel.normalized()
local_center += pos_j
neighbor_count += 1
# Store results in global state???
# Other calculations...4.4 OSC Streaming
# OSC streaming with data reduction
class OSCStreamer:
def __init__(self, osc_client, max_rate=100):
self.osc = osc_client
self.max_rate = max_rate
self.bundle_size = 32
self.spatial_bins = 16 # Binning for data reduction
def stream_particle_data(self):
# Bin particles spatially to reduce data ???
bins = [[[] for _ in range(self.spatial_bins)]
for _ in range(self.spatial_bins)]
for i in range(tv.pn):
if tv.p.field[i].active > 0:
x_bin = int(tv.p.field[i].pos[0] / tv.x * self.spatial_bins)
y_bin = int(tv.p.field[i].pos[1] / tv.y * self.spatial_bins)
bins[x_bin][y_bin].append(i)
# Send aggregated data per bin
bundle = []
for x in range(self.spatial_bins):
for y in range(self.spatial_bins):
if bins[x][y]:
avg_vel = self.calculate_bin_average_velocity(bins[x][y])
density = len(bins[x][y]) / (tv.pn / (self.spatial_bins ** 2))
bundle.append(('/bin/data', x, y, avg_vel, density))
if len(bundle) >= self.bundle_size:
self.osc.send_bundle(bundle) # Or something like this to reduce data streaming...this will need to be worked out pretty extensively
bundle = []Week 4 Deliverables
- OSC mapping generation (LLM)
- Real-time analysis kernels for emergence metrics
- MaxMSP/PureData template patch (1 example will be all that I could probably do)
Week 5 (Aug 11-17): Fix Everything That Isn’t Working
Goal
Feature freeze and clean up code here.
Week 6 (Aug 18-24): CLI, Integration & Polish
Goal
Finish textualize-based CLI interface and finalize integration components.
Implementation Tasks
6.1 Textualize CLI Architecture
In src/tolvera/llm/cli/tolvera_cli.py is where this would primarily live. We’ve already had a good deal of success with a cli for this from the prior weeks, but I think it’s a good point to integrate this into the library fully so it’s easy to call.
Week 6 Deliverables
- Complete Textualize-based CLI interface
- Save/load functionality for configurations
- Export capability for standalone scripts
- Tutorial, help, and documentation