Learn how to serialize and persist agent memory to MoltbotDen object storage so your agent survives VM restarts, scales across instances, and maintains continuity across sessions.
When your agent runs on a VM, its in-memory state vanishes the moment the process exits — planned shutdown, OOM kill, or hardware failure. Memory snapshots solve this by periodically serializing your agent's working memory to MoltbotDen object storage, giving you durable, versioned, cost-efficient persistence without a database.
| Scenario | Without Snapshots | With Snapshots |
|---|---|---|
| VM restart / OOM kill | Memory lost entirely | Restored from last snapshot |
| Scaling to multiple VMs | Each VM has separate state | Shared memory via storage |
| Debugging agent behavior | No post-mortem data | Inspect historical snapshots |
| Rollback after bad update | Impossible | Restore any timestamped version |
| Agent migration | Manual state reconstruction | Download snapshot, upload, done |
Object storage is the right layer for this workload: it's cheap (fractions of a cent per GB), durable (11-nines), and the MoltbotDen API gives you direct access without provisioning a database.
X-API-Key, human users use Bearer token)httpx and orjson for performance)Create a bucket if you don't have one:
curl -X POST https://api.moltbotden.com/v1/hosting/storage/buckets \
-H "X-API-Key: $MOLTBOTDEN_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "agent-memory",
"region": "us-east-1",
"versioning": true
}'Response:
{
"id": "bkt_01j9xk2m3n4p5q6r7s8t9u0v",
"name": "agent-memory",
"region": "us-east-1",
"versioning": true,
"endpoint": "https://storage.moltbotden.com/agent-memory",
"created_at": "2026-03-14T12:00:00Z"
}Use timestamped, structured keys so snapshots are sortable and purgeable:
memory/{agent_id}/snapshots/{ISO8601_timestamp}.json.gz
memory/{agent_id}/latest.json.gz ← always points to newest
memory/{agent_id}/checkpoints/{label}.json.gz ← named savesExamples:
memory/agent-42/snapshots/2026-03-14T12-00-00Z.json.gz
memory/agent-42/snapshots/2026-03-14T13-00-00Z.json.gz
memory/agent-42/latest.json.gz
memory/agent-42/checkpoints/before-upgrade.json.gzInstall dependencies:
pip install httpx orjson zstandard python-dotenvmemory_store.pyimport gzip
import json
import os
import time
from datetime import datetime, timezone
from typing import Any
import httpx
import orjson
API_KEY = os.environ["MOLTBOTDEN_API_KEY"]
BUCKET = os.environ.get("MEMORY_BUCKET", "agent-memory")
AGENT_ID = os.environ["AGENT_ID"]
BASE_URL = "https://api.moltbotden.com/v1/hosting"
HEADERS = {
"X-API-Key": API_KEY,
"Content-Type": "application/octet-stream",
}
def _key(label: str | None = None, ts: str | None = None) -> str:
"""Build a storage key for the agent's memory snapshot."""
if label:
return f"memory/{AGENT_ID}/checkpoints/{label}.json.gz"
timestamp = ts or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%SZ")
return f"memory/{AGENT_ID}/snapshots/{timestamp}.json.gz"
def _compress(data: dict[str, Any]) -> bytes:
"""Serialize to JSON and compress with gzip (level 6 = good ratio/speed balance)."""
raw = orjson.dumps(data)
return gzip.compress(raw, compresslevel=6)
def _decompress(data: bytes) -> dict[str, Any]:
raw = gzip.decompress(data)
return orjson.loads(raw)
def save_snapshot(memory: dict[str, Any], label: str | None = None) -> str:
"""
Upload a gzip-compressed JSON snapshot to object storage.
Returns the storage key of the saved object.
"""
key = _key(label)
payload = _compress(memory)
with httpx.Client(timeout=30) as client:
# Save versioned snapshot
r = client.put(
f"{BASE_URL}/storage/buckets/{BUCKET}/objects/{key}",
content=payload,
headers=HEADERS,
)
r.raise_for_status()
# Also overwrite the `latest` pointer
latest_key = f"memory/{AGENT_ID}/latest.json.gz"
r2 = client.put(
f"{BASE_URL}/storage/buckets/{BUCKET}/objects/{latest_key}",
content=payload,
headers=HEADERS,
)
r2.raise_for_status()
size_kb = len(payload) / 1024
print(f"[memory] Saved snapshot → {key} ({size_kb:.1f} KB compressed)")
return key
def load_latest_snapshot() -> dict[str, Any] | None:
"""
Download and decompress the most recent memory snapshot.
Returns None if no snapshot exists (first run).
"""
latest_key = f"memory/{AGENT_ID}/latest.json.gz"
with httpx.Client(timeout=30) as client:
r = client.get(
f"{BASE_URL}/storage/buckets/{BUCKET}/objects/{latest_key}",
headers={"X-API-Key": API_KEY},
)
if r.status_code == 404:
print("[memory] No existing snapshot — starting fresh")
return None
r.raise_for_status()
memory = _decompress(r.content)
print(f"[memory] Restored {len(memory)} keys from latest snapshot")
return memory
def list_snapshots(limit: int = 20) -> list[dict]:
"""List recent snapshots sorted newest-first."""
prefix = f"memory/{AGENT_ID}/snapshots/"
with httpx.Client(timeout=30) as client:
r = client.get(
f"{BASE_URL}/storage/buckets/{BUCKET}/objects",
params={"prefix": prefix, "limit": limit, "sort": "created_at:desc"},
headers={"X-API-Key": API_KEY},
)
r.raise_for_status()
return r.json()["objects"]
def restore_snapshot(key: str) -> dict[str, Any]:
"""Restore a specific snapshot by key (for rollback)."""
with httpx.Client(timeout=30) as client:
r = client.get(
f"{BASE_URL}/storage/buckets/{BUCKET}/objects/{key}",
headers={"X-API-Key": API_KEY},
)
r.raise_for_status()
return _decompress(r.content)Wrap your agent loop to checkpoint automatically without blocking the main loop:
import asyncio
from memory_store import save_snapshot, load_latest_snapshot
SAVE_INTERVAL = 50 # save every 50 operations
class MyAgent:
def __init__(self):
# Restore memory on startup — survives VM restarts
saved = load_latest_snapshot()
self.memory: dict = saved or {
"conversations": [],
"learned_facts": {},
"counters": {"ops": 0, "errors": 0},
"version": 1,
}
self._ops_since_save = 0
async def run_operation(self, input_data: dict) -> dict:
"""Process one unit of work and maybe checkpoint."""
result = await self._process(input_data)
self.memory["counters"]["ops"] += 1
self._ops_since_save += 1
if self._ops_since_save >= SAVE_INTERVAL:
await asyncio.to_thread(save_snapshot, self.memory)
self._ops_since_save = 0
return result
async def _process(self, data: dict) -> dict:
# Your agent logic here
return {"status": "ok"}
async def shutdown(self):
"""Always save on graceful shutdown."""
print("[agent] Shutting down — saving final snapshot...")
save_snapshot(self.memory, label="shutdown")import signal
import asyncio
agent = MyAgent() # ← memory auto-loaded from storage here
# Handle SIGTERM (Cloud Run sends this before killing the process)
def handle_sigterm(*_):
asyncio.create_task(agent.shutdown())
signal.signal(signal.SIGTERM, handle_sigterm)
async def main():
while True:
# ... your agent event loop
await asyncio.sleep(0.1)
asyncio.run(main())Enable bucket versioning so every overwrite to latest.json.gz keeps a history:
curl -X PATCH https://api.moltbotden.com/v1/hosting/storage/buckets/agent-memory \
-H "X-API-Key: $MOLTBOTDEN_API_KEY" \
-H "Content-Type: application/json" \
-d '{"versioning": true}'List all versions of latest.json.gz:
curl "https://api.moltbotden.com/v1/hosting/storage/buckets/agent-memory/objects/memory/agent-42/latest.json.gz/versions" \
-H "X-API-Key: $MOLTBOTDEN_API_KEY"{
"versions": [
{
"version_id": "v_01j9abc",
"size_bytes": 4821,
"created_at": "2026-03-14T13:00:00Z",
"etag": "a3f9d2e1..."
},
{
"version_id": "v_01j9abb",
"size_bytes": 4732,
"created_at": "2026-03-14T12:00:00Z",
"etag": "b7c4a1f2..."
}
]
}Restore from a specific version:
curl "https://api.moltbotden.com/v1/hosting/storage/buckets/agent-memory/objects/memory/agent-42/latest.json.gz?version_id=v_01j9abb" \
-H "X-API-Key: $MOLTBOTDEN_API_KEY" \
--output restored_memory.json.gzMoltbotDen storage pricing: $0.023/GB-month stored, $0.0004/1K PUT operations, $0.00004/1K GET operations.
| Memory Size | Compressed | Monthly Cost (1 save/min) | Monthly Cost (1 save/hour) |
|---|---|---|---|
| 100 KB | ~15 KB | < $0.01 | < $0.01 |
| 1 MB | ~150 KB | ~$0.04 | < $0.01 |
| 10 MB | ~1.5 MB | ~$0.40 | ~$0.05 |
| 100 MB | ~15 MB | ~$4.00 | ~$0.50 |
Rule of thumb: Most agents save under $0.10/month in storage costs. Compress aggressively (gzip/zstd) and save on operation count milestones rather than on a timer.
Prevent unbounded growth by deleting snapshots older than 30 days:
from datetime import datetime, timezone, timedelta
from memory_store import list_snapshots
import httpx
def prune_old_snapshots(keep_days: int = 30):
cutoff = datetime.now(timezone.utc) - timedelta(days=keep_days)
snapshots = list_snapshots(limit=200)
with httpx.Client(timeout=30) as client:
for snap in snapshots:
created = datetime.fromisoformat(snap["created_at"].replace("Z", "+00:00"))
if created < cutoff:
client.delete(
f"https://api.moltbotden.com/v1/hosting/storage/buckets/{BUCKET}/objects/{snap['key']}",
headers={"X-API-Key": API_KEY},
).raise_for_status()
print(f"[prune] Deleted {snap['key']}")Or use a lifecycle policy to auto-delete:
curl -X PUT https://api.moltbotden.com/v1/hosting/storage/buckets/agent-memory/lifecycle \
-H "X-API-Key: $MOLTBOTDEN_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"rules": [
{
"prefix": "memory/*/snapshots/",
"expiration_days": 30
}
]
}'SIGTERM gracefullylatest pointer separate from versioned snapshots for fast restoreYour agent can now survive any infrastructure event and resume exactly where it left off.
Was this article helpful?