Scheduler¶
The scheduler is the central coordinator. It runs a main loop that checks every subsystem on configurable intervals and dispatches worker threads to process tasks.
Modes¶
Daemon mode (run_daemon): Long-running process with a WorkerPool. Acquires a file lock on /tmp/istota-scheduler-daemon.lock to prevent duplicate instances. Handles SIGTERM/SIGINT for graceful shutdown. It also starts the persistent AsyncRuntime (see below) explicitly and stops it on shutdown.
Single-pass mode (run_scheduler): Runs all checks once, processes tasks until none remain or a max_tasks limit is hit, then exits. Used for testing and one-off runs. It shares process_one_task (which uses run_coro), so it lazily starts the same persistent runtime, then calls reset_async_runtime() before returning for a clean shutdown.
Persistent asyncio runtime¶
All Nextcloud Talk I/O runs on one long-lived asyncio loop on a dedicated daemon thread, against one pooled httpx.AsyncClient, instead of a fresh asyncio.run loop + fresh client per call (src/istota/async_runtime.py). This gives TCP/TLS connection reuse to Nextcloud and removes the per-call loop-teardown leak surface.
AsyncRuntimeowns the loop thread;submit(coro, *, timeout=None)bridges sync→async viarun_coroutine_threadsafe.stop(timeout=10)cancels in-flight coroutines, runs cleanup hooks (closing the shared client), then stops the loop — cancel-before-aclose so a hook can't close the client out from under a live request.run_coro(coro, *, timeout=None)is the workhorse every sync Talk call site uses (run_coro(post_result_to_talk(...)),run_coro(poll_talk_conversations(config)), …). It lazily starts the process-global runtime on first use.get_talk_client(config)is a process-global persistentTalkClientsingleton; every Talk delivery path pulls from it so they share one connection pool.
Invariant: every TalkClient method invocation must end up on the persistent loop (via run_coro), because the methods issue requests on the loop-bound client. There are no transient TalkClient(config) constructions left in daemon Talk paths. Email delivery stays on asyncio.run (sync SMTP, not httpx).
Main loop¶
while not shutdown_requested:
check_briefings() # every briefing_check_interval (60s)
check_scheduled_jobs() # every briefing_check_interval
check_sleep_cycles() # every briefing_check_interval
check_channel_sleep_cycles()# every briefing_check_interval
poll_emails() # every email_poll_interval (60s)
organize_shared_files() # every shared_file_check_interval (120s)
poll_tasks_files() # every tasks_file_poll_interval (30s)
run_cleanup_checks() # every briefing_check_interval
check_briefing_triggers() # every briefing_check_interval (NC-app trigger files)
check_heartbeats() # every heartbeat_check_interval (60s)
pool.dispatch() # spawn workers for users with pending tasks
sleep(poll_interval) # 2s
Talk polling runs in a separate daemon thread, started at scheduler launch.
Worker pool¶
WorkerPool manages concurrent UserWorker threads with three-tier concurrency control:
Instance-level caps: max_foreground_workers (default 5) and max_background_workers (default 3) limit total concurrent workers by queue type. Dispatch is two-phase: foreground first, then background.
Per-user limits: user_max_foreground_workers (default 2) and user_max_background_workers (default 1) cap how many workers a single user can have. Individual users can override these in their config.
Per-channel gate: Before creating a task, the Talk poller checks if an active foreground task exists for the conversation. If so, it queues the message but sends "Still working on a previous request" as an immediate response.
Workers are keyed by (user_id, queue_type, slot). Each UserWorker is a thread that processes tasks serially, exiting after worker_idle_timeout (10s) of no tasks. Thread safety: fresh DB connections per call, new asyncio.run() event loop per worker, threading.Lock on the workers dict.
Task claiming¶
claim_task() uses atomic UPDATE...RETURNING with stale lock detection:
- Fail old stale locked tasks (created >
max_retry_age, locked > 30 min) - Release recent stale locks for retry
- Fail old stuck running tasks
- Release recent stuck running tasks for retry
- Claim next pending:
ORDER BY priority DESC, created_at ASC
The claim sets status='locked', locked_at=now, locked_by=worker_id atomically.
Stuck-running detection by worker liveness (ISSUE-112)¶
Steps 3–5 (and the standalone fail_stuck_locked_running_tasks() maintenance pass) share _STUCK_RUNNING_PREDICATE, which decides "stuck" by worker liveness, not raw runtime. A running task is stuck when its last_heartbeat has been silent longer than worker_stuck_minutes (default 10); when no heartbeat was ever recorded it falls back to started_at older than task_timeout_minutes + grace. The running worker pings last_heartbeat every worker_heartbeat_seconds (default 60) via the _task_heartbeat context manager (db.touch_task_heartbeat), so a slow-but-alive worker — notably the in-process native brain, which has no killable PID — is never reclaimed, while a crashed worker is recovered in minutes. (This is distinct from the health-check heartbeat system in heartbeat.py.)
claim_task and every other Task-returning helper (get_task, get_pending_confirmation*, get_reply_parent_task, get_stale_pending_tasks, get_completed_*_since) route their SELECT/RETURNING through a single _TASK_COLUMNS constant in db.py. Adding a column means editing one place; missing columns now raise IndexError rather than silently returning None (the failure mode that masked the brief 027eb1a regression where task.skill came back unset and module-poller rows fell through to the LLM path with an empty prompt).
Task dispatch¶
process_one_task decides between three execution paths based on the task's columns:
| Task shape | Dispatcher | Notes |
|---|---|---|
task.skill set |
_execute_skill_task() |
python -m istota.skills.<skill> subprocess. Trusted env via build_skill_env(list(skill_index), skill_index, ctx) over the full index, so co-declared vars (e.g. NC_URL declared on both files and nextcloud) reach the subprocess. No proxy split. |
task.command set |
_execute_command_task() |
Shell command. Admin-gated (non-admin tasks refused at runtime + dropped at sync time). Same trusted env resolver as skill-tasks. JSON {"status":"error","error":"…"} envelopes on stdout are detected and surfaced as failures even when returncode is 0. |
| neither | LLM path via the brain | Default; runs through execute_task |
Auto-seeded _module.feeds.run_scheduled / _module.money.run_scheduled rows dispatch as skill-tasks. _purge_obsolete_skill_jobs removes rows whose skill name is no longer in the index.
Task processing¶
process_one_task() handles the full lifecycle:
- Claim a task (with optional
user_idfilter) - Update status to
running - Get user resources, send Talk acknowledgment, download attachments
- Call
execute_task()->(success, result, actions_taken, execution_trace) - On success:
- Check for malformed output (leaked tool-call XML) -> reclassify as failure
- Check for confirmation request (regex pattern)
- Update to
completed - Index conversation for memory search
- Deliver results
- On failure:
- Check cancellation
- Retry with exponential backoff if attempts remain (1, 4, 16 min delays)
- Mark permanently failed after max attempts
Retry logic¶
Failed tasks retry with exponential backoff: 1 min, 4 min, 16 min (up to max_attempts, default 3). Transient API errors (5xx, 429) get 3 fast retries with 5s delay before counting against task attempts.
Task event streaming¶
One persistent, typed event stream per task feeds every output surface. process_one_task builds an EventWriter (events.py) per brain-path task and subscribes the in-process consumers (TalkEventSubscriber, LogChannelSubscriber, PushNotificationSubscriber) before passing the writer to execute_task(event_writer=…). The executor adapts the brain's StreamEvent stream into TaskEvents, persisted to the task_events table (WAL, shared scheduler ⇄ web). When the task reaches a non-retry terminal state the scheduler emits the terminal event (confirmation / result / cancelled / error + done) and calls writer.finish().
Retry continuity: on a retry-eligible failure the event log is kept (not wiped). The retry branch emits a progress_text "⏳ Attempt failed — retrying in N min…" notice, and the next attempt's EventWriter resumes seq from db.get_max_task_event_seq so it stays monotonic across attempts and a watching web client survives the retry instead of hanging on "Working…". The SSE / snapshot endpoints synthesize a terminal frame from the task row (web_app._synthetic_terminal_events) for any terminal-without-done gap (e.g. a crash that skipped finish()).
Config under [scheduler]: progress_show_tool_use, progress_show_text, event_log_enabled, stream_text_gate_chars, push_notification_threshold_seconds, push_notification_sources.
Delivery routing¶
Where a task's result goes is resolved by transport.routing.resolve_delivery_plan(config, task, registry), which turns a task into an ordered, deduplicated, channel-resolved list of destinations. Precedence: explicit output_target > reply-to-origin (interactive source types) > source-type default > drop. process_one_task builds the plan once and fans out to every push destination; stream destinations (REPL, web) contribute no push work — the task_events log is the delivery. Separately, a per-user purpose-keyed routing table (UserConfig.routing, purposes reply/alert/log/briefing/notification) routes notifications via notifications.send_notification(..., purpose=…). See the Transport abstraction and .claude/rules/transport.md.
Deferred DB operations¶
With the bubblewrap sandbox, the DB is read-only inside the subprocess. Claude and skill CLIs write JSON files to a writable temp dir. The scheduler processes these after successful completion. The handlers and the file envelope helper live in scheduler_deferred.py (extracted from scheduler.py for size and testability; scheduler.py keeps a re-export shim so from istota.scheduler import _process_deferred_* still works).
| File | Handler | Purpose |
|---|---|---|
task_{id}_subtasks.json |
_process_deferred_subtasks |
Subtask creation (admin-only, depth- and rate-capped) |
task_{id}_tracked_transactions.json |
_process_deferred_tracking |
Transaction dedup tracking |
task_{id}_sent_emails.json |
_process_deferred_sent_emails |
Outbound email tracking for emissary thread matching |
task_{id}_kv_ops.json |
_process_deferred_kv_ops |
KV store set/delete operations |
task_{id}_kg_ops.json |
_process_deferred_kg_ops |
Knowledge-graph fact add/invalidate/delete (per-op commit) |
task_{id}_user_alerts.json |
_process_deferred_user_alerts |
Alerts/notifications for the user's alerts channel |
task_{id}_health_ops.json |
_process_deferred_health_ops |
Health module inserts/updates replayed against the per-user health.db |
task_{id}_email_output.json |
_deliver_deferred_email_output |
Structured email reply (preferred over the legacy stdout-JSON parser) |
_load_deferred_json(user_temp_dir, task_id, suffix, expected_type=...) is the shared envelope helper: builds the path, exists-checks, parses JSON, validates the top-level shape (list or dict), and warns + unlinks on a malformed file. Each handler then runs its own business logic and unlinks at the call site so per-handler invariants (admin gate, depth gate, KG per-op commit) read cleanly.
_purge_deferred_files_for_retry clears the slate when a task is set back to pending_retry, so a non-idempotent op like a KG invalidate isn't replayed twice across attempts. _warn_unconsumed_deferred_files scans the user temp dir after the drain phase and logs WARN for files missing the task_ prefix or carrying an unknown suffix; the misnamed file is left on disk for inspection.
Identity fields (user_id, conversation_token) always come from the task, not from the JSON, to prevent spoofing.
Cleanup¶
Runs every briefing_check_interval:
- Cancel stale confirmations after 120 min, notify user
- Log warnings for tasks pending longer than 30 min
- Auto-fail tasks pending longer than
stale_pending_fail_hours(2) - Delete completed tasks older than
task_retention_days(7) - Delete processed emails from IMAP older than
email_retention_days(7) - Delete old temp files
Poller intervals¶
| Poller | Default interval | Config key |
|---|---|---|
| Task queue | 2s | poll_interval |
| Pending-task dispatch sub-tick | 0.5s | dispatch_interval |
| Talk conversations | 10s | talk_poll_interval |
| Email (IMAP) | 60s | email_poll_interval |
| Briefings/jobs/sleep/cleanup | 60s | briefing_check_interval |
| TASKS.md files | 30s | tasks_file_poll_interval |
| Shared files | 120s | shared_file_check_interval |
| Heartbeats | 60s | heartbeat_check_interval |
SQLite health (quick_check + self-heal REINDEX) |
86400s (24h) | db_health_check_interval |
dispatch_interval decouples cold pending-task pickup latency from the interval-gated checks: the main loop runs pool.dispatch() on this sub-tick cadence without re-running the per-subsystem checks (0 or ≥ poll_interval = legacy one-dispatch-per-tick). cron_max_staleness_minutes (default 60) is the insertion-time staleness gate for check_scheduled_jobs / check_briefings — after a long outage it skips the catch-up insert and resumes from the next future fire, suppressing thundering-herd catch-up.