Using Effect RPC for Workers in TypeScript
This blog post is also available as a video on YouTube above.
Workers are pretty neat. They let you run code off the main thread, keeping your UI or server responsive during heavy computation. But actually using them? The developer experience can be… less than ideal.
If you’ve worked with standard Web Workers, for example, you’ve probably run into some of these headaches:
- Manual message passing: You’re stuck with
postMessage
and blobs of loosely-typed data. Refactoring is risky, and you have to invent your own discriminators. - No automatic error propagation: If something throws in the worker, you have to invent your own serializable error protocol and handle it everywhere.
- Messy async patterns: You need to wire up your own request/response system, track IDs, and avoid callback hell.
- Type safety is on you: You hope the data you sent matches what the worker expects (and vice-versa), but you only find out for sure when things break at runtime.
- No built-in concurrency control: If multiple parts of your app use the same worker, you can easily overload it. Distributing work across a pool? Good luck.
Basically, the standard Worker API leaves you to rebuild a lot of infrastructure for reliable, type-safe async communication—stuff that just feels… backwards.
This is where Effect RPC (@effect/rpc
) comes in handy, especially if you’re already using the Effect ecosystem.
Effect RPC, along with @effect/platform
, gives you a structured way to talk between the main thread and workers. You define a clear contract using schemas, and Effect handles the messy parts: serialization, routing requests, propagating errors, and even managing concurrency – all with type safety from end to end. Instead of manual postMessage
chaos, you get a clean client that makes calling your worker feel like any other asynchronous operation.
The cool thing? Effect RPC is protocol agnostic. You can use it for Web Workers, WebSockets, HTTP, etc. Whatever you need.
Let’s walk through how to set up a much nicer worker experience using Effect RPC.
Step 1: Define the Contract (Get Everyone on the Same Page)
First up, we define the “API” for our worker. What can it do? What data does it need? What does it return? What errors can pop up? We use @effect/rpc
and effect/Schema
for this. Effect RPC asks you to be explicit about this contract, which gives you stronger guarantees than inferred types.
import * as Rpc from "@effect/rpc/Rpc";
import * as RpcGroup from "@effect/rpc/RpcGroup";
import * as Schema from "effect/Schema";
// Define potential errors using Schema.TaggedError
// (makes them serializable and easy to handle)
export class FilterError extends Schema.TaggedError<FilterError>()(
"FilterError",
{
message: Schema.String
}
) {}
// Define our RPC interface using RpcGroup.make
export class WorkerRpc extends RpcGroup.make(
// Define the 'filterData' procedure
Rpc.make("filterData", {
// What a successful call returns
success: Schema.Array(Schema.Number),
// What typed errors this call can fail with
error: FilterError,
// What data this call needs
payload: {
data: Schema.Array(Schema.Number),
threshold: Schema.Number
}
}),
// Define the 'calculatePrimes' procedure
Rpc.make("calculatePrimes", {
success: Schema.Number,
// This one isn't expected to fail with a specific *typed* error
error: Schema.Never,
payload: {
upperBound: Schema.Number
}
})
) {}
So what’s happening here?
RpcGroup.make
is like a container for all the procedures our worker will expose.Rpc.make
defines each specific procedure (filterData
,calculatePrimes
).Schema
spells out the exact shape of the data: inputs (payload), success results (success), and expected errors (error).Schema.TaggedError
is great for defining custom error types that are also serializable.- Even if a function isn’t supposed to fail in a predictable way, we mark it (error:
Schema.Never
). If something unexpected does go wrong (like a bug), Effect calls that a “defect” and handles it separately. - This
WorkerRpc
definition acts as the single source of truth, keeping our main thread and worker thread code in sync.
If you haven’t played with effect/Schema
before, you’re missing out. It’s not just a decoder—it’s totally bidirectional. That means it both decodes and encodes, so Effect uses the exact same schema to serialize and deserialize your data. You get full runtime safety, end to end.
Step 2: Implement the Worker Logic (The Server Side)
Now, let’s write the code that actually runs inside the worker. Instead of messy onmessage
handlers, we just provide implementations for the procedures we defined in WorkerRpc
, and let Effect’s RpcServer
handle the communication details.
import * as BrowserRuntime from "@effect/platform-browser/BrowserRuntime";
import * as BrowserWorkerRunner from "@effect/platform-browser/BrowserWorkerRunner";
import * as RpcServer from "@effect/rpc/RpcServer";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import { FilterError, WorkerRpc } from "./worker-contract";
const isPrime = (num: number): boolean => {
if (num <= 1) return false;
if (num <= 3) return true;
if (num % 2 === 0 || num % 3 === 0) return false;
for (let i = 5; i * i <= num; i = i + 6) {
if (num % i === 0 || num % (i + 2) === 0) return false;
}
return true;
};
// Implement the handlers for our RpcGroup procedures using an Effect Layer
// Layers are Effect's way of handling dependency injection and setup
// Think of them as constructors that you can retry, merge, and more
const Live = WorkerRpc.toLayer(
Effect.gen(function* () {
// This Effect runs once when the worker initializes
yield* Effect.logInfo("Worker started");
// Return an object where keys match our procedure names
return {
// Handler for 'filterData'
filterData: (req) =>
Effect.gen(function* () {
yield* Effect.logInfo(
`Worker received request to filter ${req.data.length} items with threshold ${req.threshold}`
);
if (req.threshold < 0) {
yield* Effect.logError("Worker received invalid threshold");
// To fail with a typed error, we return it wrapped in Effect.fail
// (or just return the error instance directly here
// thanks to TaggedError being yieldable)
return yield* new FilterError({
message: "Threshold cannot be negative"
});
}
const filtered = req.data.filter((n) => n > req.threshold);
yield* Effect.logInfo(
`Worker finished filtering. Returning ${filtered.length} items.`
);
return filtered;
}),
// Handler for 'calculatePrimes'
calculatePrimes: ({ upperBound }) =>
Effect.gen(function* () {
yield* Effect.logInfo(
`Worker received request to calculate primes up to ${upperBound}`
);
let count = 0;
for (let i = 2; i <= upperBound; i++) {
if (isPrime(i)) {
count += 1;
}
}
yield* Effect.logInfo(
`Worker finished calculating primes. Found ${count} primes.`
);
return count;
})
};
})
);
// Define our RPC Server Configuration
const RpcWorkerServer = RpcServer.layer(WorkerRpc).pipe(
// Plug in our actual implementations
Layer.provide(Live),
// Tell the server to use the standard Worker communication channel
Layer.provide(RpcServer.layerProtocolWorkerRunner),
// Provide the browser-specific parts needed to run in a worker context
Layer.provide(BrowserWorkerRunner.layer)
);
// Start the worker runtime
BrowserRuntime.runMain(
// BrowserWorkerRunner.launch sets everything up and handles cleanup
BrowserWorkerRunner.launch(RpcWorkerServer).pipe(
Effect.tapErrorCause((error) => Effect.logError("[Worker] crashed", error))
)
);
Let’s break down the worker setup:
- Handlers (
ImplementationLive
): We useWorkerRpc.toLayer(...)
to create aLayer
that provides the implementations for our defined RPCs (FilterData
,Test
). The structure mirrors theRpcGroup
definition. - RPC Server Configuration (
RpcWorkerServer
):RpcServer.layer(WorkerRpc)
: Creates the core server logic tied to our contract.RpcServer.layerProtocolWorkerRunner
: This is the magic piece. It tells theRpcServer
to listen for and respond to messages using the standard workerpostMessage
/onmessage
mechanism, mediated by@effect/platform/WorkerRunner
.BrowserWorkerRunner.layer
: ThelayerProtocolWorkerRunner
needs aPlatformRunner
service, which this provides for the browser context. This meanslayerProtocolWorkerRunner
is platform-agnostic. You can support any runner you want (e.g. Node.js, Bun, etc.) with just one line of code.
- Running the Worker:
BrowserWorkerRunner.launch(RpcWorkerServer)
starts the server layer and integrates it with the worker’s lifecycle, ensuring proper setup and teardown.
Our worker is now a fully functional, type-safe RPC server, ready to receive requests.
Step 3: Calling the Worker (The Client Side)
Okay, back on the main thread (your UI code, perhaps), we set up an RpcClient
to talk to the worker. The structure is quite similar to the server setup.
import * as BrowserWorker from "@effect/platform-browser/BrowserWorker";
import * as RpcClient from "@effect/rpc/RpcClient";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import { WorkerRpc } from "./worker-contract"; // Use the same contract!
// How you import the worker file depends on your bundler (Vite, Webpack, etc.)
import MyWorker from "./worker.ts?worker";
// Configure the RPC Client Layer to use the Worker protocol
const RpcProtocol = RpcClient.layerProtocolWorker({
// How many worker instances should we manage in our pool?
size: 1,
// How many requests can one worker handle at the same time?
// (Usually 1 for CPU tasks)
concurrency: 1
}).pipe(
// Tell the client *how* to create a new Worker instance when needed
Layer.provide(BrowserWorker.layerPlatform(() => new MyWorker())),
// If setting up the protocol fails, it's a critical error
Layer.orDie
);
// Define an Effect Service to make the client easily injectable and testable
export class WorkerClient extends Effect.Service<WorkerClient>()(
"@org/WorkerClient",
{
// This service depends on the RpcProtocol layer
dependencies: [RpcProtocol],
scoped: Effect.gen(function* () {
// Create the client instance using our contract.
// It needs the RpcProtocol layer.
const client = yield* RpcClient.make(WorkerRpc);
// Expose the client through this service
return {
client: client
};
})
}
) {}
Client setup mirrors the server:
- Protocol (
RpcProtocol
):RpcClient.layerProtocolWorker({ size: 1 })
: Configures the client to use the worker protocol. We setsize: 1
for now, meaning it will manage one worker instance.BrowserWorker.layerPlatform(...)
: Provides the mechanism to spawn the actual browserWorker
.
- Service (
WorkerClient
): A standard Effect service pattern.RpcClient.make(WorkerRpc)
: Creates the type-safe client instance based on our shared contract. This effect requires theRpcProtocol
layer (provided viaEffect.provide
) and aScope
(implicitly provided byLayer.scoped
).- The service simply exposes the created
client
.
Step 4: Using the Client
This is where the setup pays off. Actually using the worker from your main application code becomes clean, type-safe, and feels just like any other Effect operation.
import * as Effect from "effect/Effect";
import { WorkerClient } from "./worker-service"; // Import our service
const program = Effect.gen(function* () {
// Access the RPC client via the service
const { client } = yield* WorkerClient;
const largeData = Array.from({ length: 1_000_000 }, (_, i) => i);
const filterThreshold = 999_990;
yield* Effect.logInfo(
`Sending ${largeData.length} items to worker for filtering...`
);
// Call the worker RPC like a regular function!
const result = yield* client
.filterData({
data: largeData,
threshold: filterThreshold
})
.pipe(
// Standard Effect error handling applies
Effect.catchTag("FilterError", () => Effect.succeed([]))
);
yield* Effect.logInfo(
`Received ${result.length} filtered items from worker.`
);
return result;
});
// Run the program, providing the WorkerClient layer
program
.pipe(
Effect.provide(WorkerClient.Default),
// Or runFork, runSync, etc.
Effect.runPromise
)
.then((finalResult) => {
console.log("Final filtered result count:", finalResult.length);
});
Look how clean that is!
client.filterData(...)
returns anEffect.Effect<number[], FilterError>
. Type safety is enforced end-to-end.- We use standard Effect combinators like
pipe
,catchTag
, etc., to handle results and errors. - There’s no manual message creation, posting, listening, or ID correlation. Effect RPC handles the underlying complexity.
Scaling Up: Worker Pools Made Easy
Our current setup uses RpcClient.layerProtocolWorker({ size: 1 })
, meaning the client manages a single worker instance. What if we have computationally intensive tasks and want to run multiple in parallel? Or perhaps different parts of our application need to perform distinct, heavy synchronous tasks off the main thread simultaneously? Running everything through one worker becomes a bottleneck.
This is where worker pools come in. The RpcClient.layerProtocolWorker
layer is built on top of Effect’s core Pool
abstraction (@effect/Pool
). We can easily configure it to manage multiple worker instances.
Let’s change our client setup to use a pool of 5 workers:
// Configure the RPC Client Protocol Layer for a pool of 5 workers
const RpcProtocol = RpcClient.layerProtocolWorker({
// *** Configure the pool size ***
size: 5
// Optional: Control concurrency per worker instance (defaults to 1)
// concurrency: 2,
// Optional: Configure time-to-live for idle workers in dynamic pools
// minSize: 2, maxSize: 10, timeToLive: "60 seconds"
}).pipe(
Layer.provide(BrowserWorker.layerPlatform(() => new MyWorker())),
Layer.orDie
);
That’s it! By simply changing { size: 1 }
to { size: 5 }
, our RpcClient
now manages a pool of five worker instances.
How does this work internally?
The RpcClient.layerProtocolWorker
uses Pool.make
(or Pool.makeWithTTL
for dynamic pools) from the effect/Pool
module. This Pool
is a generic, powerful primitive in Effect for managing pools of any acquired resource that needs lifecycle management (like database connections, network sockets, or, in our case, worker instances).
Here’s a simplified view of what happens when you use the client backed by this pool:
client.filterData(...)
is called: TheRpcClient
needs a worker to send the request to.pool.get
: The client asks thePool
for a worker instance viapool.get
. ThePool
manages the available worker instances.- If an idle worker is available in the pool, the
Pool
returns it immediately. - If all workers are busy (i.e., currently handling other requests up to their configured
concurrency
),pool.get
will wait (semantically block the fiber) until one becomes available. - If the pool hasn’t reached its configured
size
(ormaxSize
) and demand is high, thePool
might decide to acquire (spawn) a new worker instance using theacquire
effect provided during setup (whichlayerProtocolWorker
configures internally to useWorker.spawn
).
- If an idle worker is available in the pool, the
- Scoped Usage: Crucially,
pool.get
returns anEffect.Effect<WorkerInstance, E, Scope.Scope>
. This means the acquired worker is tied to a Scope. When theEffect
that uses this worker (i.e., the effect representing yourclient.filterData
call, including sending the request and receiving the response) completes (either successfully, with failure, or via interruption), theScope
is closed. - Release: As part of the
Scope
finalization managed by thePool
, the worker instance is automatically released back to the pool, making it available for the nextpool.get
request. This ensures resources are never leaked and are efficiently reused.
The effect/Pool
implementation handles all the complexities of tracking item usage, managing minimum/maximum sizes, handling idle timeouts (timeToLive
), and ensuring safe acquisition and release through the Scope
mechanism. RpcClient.layerProtocolWorker
leverages this robust primitive, providing a simple configuration interface ({ size: ... }
) while benefiting from the power and safety of the underlying Pool
.
You can even manually invalidate
a specific worker instance obtained from the pool if you detect it’s behaving incorrectly, causing the pool to eventually discard and replace it.
By using a pool, your application can now handle multiple concurrent requests to the worker logic, distributing the load across the available worker instances automatically, without you needing to manage the individual workers or their lifecycles manually.
Conclusion
Workers are great for performance, but the standard API can be a drag to work with, leading to boilerplate and runtime errors. Effect RPC offers a much smoother, safer, and more productive path.
By defining a clear contract with Schema
, using the RpcServer
and RpcClient
tailored for workers, and leveraging Effect’s built-in pooling, you get:
- End-to-End Type Safety: Eliminate runtime errors caused by mismatched message types or serialization issues.
- Simplified Communication: Interact with workers using familiar Effect patterns (
Effect.gen
,pipe
,catchTag
) instead of manualpostMessage
. - Automatic Error Propagation: Worker errors are surfaced naturally within the client Effect’s error channel.
- Built-in Pooling: Easily scale worker usage by configuring
RpcClient.layerProtocolWorker
to manage a pool of instances, powered by the robusteffect/Pool
primitive. - Composability: Integrate worker calls seamlessly into larger Effect workflows.
- Platform-Agnostic: Swap out the worker runner for a different platform (e.g. Node.js, Bun, etc.) with a single line of code.
It takes a little more setup than raw postMessage
, sure, but the payoff in robustness, maintainability, and reduced debugging time is well worth it, especially as your application grows. Effect RPC lets you focus on what your worker needs to do, not how it talks to the main thread. Give it a try on your next project!