Question: Actor Persistence / Stateful actors #254
Replies: 2 comments 1 reply
-
|
Great question. This is a deliberate design decision - Ergo Framework follows the Erlang/OTP philosophy where actor processes are ephemeral and state lives outside the actor. There is no built-in "Persistent Actor" pattern (like Akka Persistence) and this is intentional. Here's why and what to do instead. Why there's no built-in event sourcing / persistent actors The Erlang/OTP philosophy (which Ergo follows) treats actors as computation units, not storage units. The "let it crash" model assumes:
In Akka, persistent actors are convenient but come with well-known pain points - schema evolution of persisted events, journal compaction, snapshot management, recovery time growing with event count. Ergo avoids inheriting these problems by keeping the actor model clean. The recommended approach for your use case For long-running computations that must survive restarts/deployments in K8s, the pattern is external queue + checkpointing: Pattern 1: External queue with explicit acknowledgment Instead of sending messages directly to the worker actor, put work items into an external durable queue (NATS JetStream, Kafka, Redis Streams, PostgreSQL with FOR UPDATE SKIP LOCKED, etc.). The actor pulls work from the queue and only acknowledges completion after processing finishes. On restart (after deploy), the supervisor restarts the worker, and in This is the simplest and most reliable pattern. It completely avoids the "how do we replay stored messages" problem. Pattern 2: Checkpoint-based recovery for multi-step computation If the computation is genuinely long (seconds to minutes) and can be broken into steps: type AnalyticsWorker struct {
act.Actor
store CheckpointStore // your backend (Redis, PostgreSQL, etc.)
}
func (w *AnalyticsWorker) Init(args ...any) error {
// On restart, check if there's an incomplete computation
checkpoint, err := w.store.LoadCheckpoint(w.Name())
if err != nil {
return err
}
if checkpoint != nil {
// Resume from where we left off by sending ourselves a message
w.Send(w.PID(), MessageResumeComputation{
JobID: checkpoint.JobID,
Step: checkpoint.Step,
PartialData: checkpoint.Data,
})
}
return nil
}
func (w *AnalyticsWorker) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case MessageStartComputation:
// Step 1: save that we accepted the job
w.store.SaveCheckpoint(w.Name(), Checkpoint{
JobID: msg.JobID, Step: 1, Data: nil,
})
// ... do step 1 ...
result1 := computeStep1(msg.Input)
// Step 2: checkpoint after step 1
w.store.SaveCheckpoint(w.Name(), Checkpoint{
JobID: msg.JobID, Step: 2, Data: result1,
})
// ... do step 2 ...
result2 := computeStep2(result1)
// Done — clear checkpoint
w.store.DeleteCheckpoint(w.Name())
publishResult(result2)
case MessageResumeComputation:
// Pick up from the checkpointed step
switch msg.Step {
case 2:
result2 := computeStep2(msg.PartialData)
w.store.DeleteCheckpoint(w.Name())
publishResult(result2)
default:
// Step 1 wasn't completed - restart from scratch
w.Send(w.PID(), MessageStartComputation{JobID: msg.JobID, ...})
}
}
return nil
}The key insight: self-send in Pattern 3: Inbox pattern with external persistence (closest to Akka Persistence) If you really want an Akka-like "persist-then-process" guarantee: type DurableInbox struct {
act.Actor
store MessageStore
sequence uint64
}
func (d *DurableInbox) Init(args ...any) error {
// Load all unprocessed messages from store
messages, err := d.store.LoadUnprocessed(d.Name())
if err != nil {
return err
}
// Replay them in order
for _, msg := range messages {
d.Send(d.PID(), msg)
}
d.sequence = d.store.LastSequence(d.Name())
return nil
}
func (d *DurableInbox) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case MessageNewWork:
// Persist first, then process (Akka-style guarantee)
d.sequence++
if err := d.store.Persist(d.Name(), d.sequence, msg); err != nil {
// Can't persist — reject the message
return err // this will terminate and supervisor will restart
}
// Now process
result := doWork(msg)
// Mark as processed
d.store.MarkProcessed(d.Name(), d.sequence)
return nil
}
return nil
}Answering your specific questions "How do we handover to stored messages after persisting?" You don't need a special handover. Once state is persisted synchronously in the handler, the actor continues processing the next message from its mailbox naturally. The sequential single-goroutine nature of Ergo actors gives you this guarantee for free - HandleMessage finishes (including the persist call) before the next message is dequeued. "On Init() how do we replay stored messages?" Use "The processing of the next command will not start until the state has been successfully stored" This is automatically guaranteed by the actor model. Since all message handling is sequential (single goroutine), if your HandleMessage does store.Save() synchronously before returning, the next message won't Recommendation for K8s specifically For K8s rolling deployments, Pattern 1 (external queue) is strongly recommended because:
The external queue approach is what most production Erlang/Elixir systems use too (with RabbitMQ, Kafka, etc.) - persistent actors in those ecosystems are typically reserved for aggregate roots in event-sourced domains, not for computation pipelines. |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for these great insights, I'll play around with the options and see which best suits our use case( we can even mix and match them depending on what's at hand). Great library and I am looking forward to the release of v3.3.0. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
What is the recommended way of implementing stateful actors that can recover their states after an event such as application upgrade.
Example use-case:
A long running task (maybe a heavy computation that takes several seconds, e.g. part of an analytics pipeline); message has been received by the worker/actor, but in the middle of processing a new version of the app has been been deployed(picture this happening in a K8s environment). Naturally the actor state will be lost.
What would be the best way to go about this?
Weird workarounds (not yet tested):
HandleMessageNamecallback is invoked via Split-Handle - still not that hardBut this presents a problem, in classic Akka
In such a case, how do we handover to stored messages after persisting?
Also, on
Init()how do we replay stored messages to the actor for processing after a fresh start?I am sure that you've thought about this and decided not to implement it for a reason, but I would be glad if you pointed me to the right direction so that I can implement a solution that would help with this.
Beta Was this translation helpful? Give feedback.
All reactions