Lucas Barake

Using Effect RPC for Workers in TypeScript

This blog post is also available as a video on YouTube above.

Workers are pretty neat. They let you run code off the main thread, keeping your UI or server responsive during heavy computation. But actually using them? The developer experience can be… less than ideal.

If you’ve worked with standard Web Workers, for example, you’ve probably run into some of these headaches:

  • Manual message passing: You’re stuck with postMessage and blobs of loosely-typed data. Refactoring is risky, and you have to invent your own discriminators.
  • No automatic error propagation: If something throws in the worker, you have to invent your own serializable error protocol and handle it everywhere.
  • Messy async patterns: You need to wire up your own request/response system, track IDs, and avoid callback hell.
  • Type safety is on you: You hope the data you sent matches what the worker expects (and vice-versa), but you only find out for sure when things break at runtime.
  • No built-in concurrency control: If multiple parts of your app use the same worker, you can easily overload it. Distributing work across a pool? Good luck.

Basically, the standard Worker API leaves you to rebuild a lot of infrastructure for reliable, type-safe async communication—stuff that just feels… backwards.

This is where Effect RPC (@effect/rpc) comes in handy, especially if you’re already using the Effect ecosystem.

Effect RPC, along with @effect/platform, gives you a structured way to talk between the main thread and workers. You define a clear contract using schemas, and Effect handles the messy parts: serialization, routing requests, propagating errors, and even managing concurrency – all with type safety from end to end. Instead of manual postMessage chaos, you get a clean client that makes calling your worker feel like any other asynchronous operation.

The cool thing? Effect RPC is protocol agnostic. You can use it for Web Workers, WebSockets, HTTP, etc. Whatever you need.

Let’s walk through how to set up a much nicer worker experience using Effect RPC.

Step 1: Define the Contract (Get Everyone on the Same Page)

First up, we define the “API” for our worker. What can it do? What data does it need? What does it return? What errors can pop up? We use @effect/rpc and effect/Schema for this. Effect RPC asks you to be explicit about this contract, which gives you stronger guarantees than inferred types.

import * as Rpc from "@effect/rpc/Rpc";
import * as RpcGroup from "@effect/rpc/RpcGroup";
import * as Schema from "effect/Schema";

// Define potential errors using Schema.TaggedError
// (makes them serializable and easy to handle)
export class FilterError extends Schema.TaggedError<FilterError>()(
  "FilterError",
  {
    message: Schema.String
  }
) {}

// Define our RPC interface using RpcGroup.make
export class WorkerRpc extends RpcGroup.make(
  // Define the 'filterData' procedure
  Rpc.make("filterData", {
    // What a successful call returns
    success: Schema.Array(Schema.Number),
    // What typed errors this call can fail with
    error: FilterError,
    // What data this call needs
    payload: {
      data: Schema.Array(Schema.Number),
      threshold: Schema.Number
    }
  }),
  // Define the 'calculatePrimes' procedure
  Rpc.make("calculatePrimes", {
    success: Schema.Number,
    // This one isn't expected to fail with a specific *typed* error
    error: Schema.Never,
    payload: {
      upperBound: Schema.Number
    }
  })
) {}

So what’s happening here?

  • RpcGroup.make is like a container for all the procedures our worker will expose.
  • Rpc.make defines each specific procedure (filterData, calculatePrimes).
  • Schema spells out the exact shape of the data: inputs (payload), success results (success), and expected errors (error). Schema.TaggedError is great for defining custom error types that are also serializable.
  • Even if a function isn’t supposed to fail in a predictable way, we mark it (error: Schema.Never). If something unexpected does go wrong (like a bug), Effect calls that a “defect” and handles it separately.
  • This WorkerRpc definition acts as the single source of truth, keeping our main thread and worker thread code in sync.

If you haven’t played with effect/Schema before, you’re missing out. It’s not just a decoder—it’s totally bidirectional. That means it both decodes and encodes, so Effect uses the exact same schema to serialize and deserialize your data. You get full runtime safety, end to end.

Step 2: Implement the Worker Logic (The Server Side)

Now, let’s write the code that actually runs inside the worker. Instead of messy onmessage handlers, we just provide implementations for the procedures we defined in WorkerRpc, and let Effect’s RpcServer handle the communication details.

import * as BrowserRuntime from "@effect/platform-browser/BrowserRuntime";
import * as BrowserWorkerRunner from "@effect/platform-browser/BrowserWorkerRunner";
import * as RpcServer from "@effect/rpc/RpcServer";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import { FilterError, WorkerRpc } from "./worker-contract";

const isPrime = (num: number): boolean => {
  if (num <= 1) return false;
  if (num <= 3) return true;
  if (num % 2 === 0 || num % 3 === 0) return false;
  for (let i = 5; i * i <= num; i = i + 6) {
    if (num % i === 0 || num % (i + 2) === 0) return false;
  }
  return true;
};

// Implement the handlers for our RpcGroup procedures using an Effect Layer
// Layers are Effect's way of handling dependency injection and setup
// Think of them as constructors that you can retry, merge, and more
const Live = WorkerRpc.toLayer(
  Effect.gen(function* () {
    // This Effect runs once when the worker initializes
    yield* Effect.logInfo("Worker started");

    // Return an object where keys match our procedure names
    return {
      // Handler for 'filterData'
      filterData: (req) =>
        Effect.gen(function* () {
          yield* Effect.logInfo(
            `Worker received request to filter ${req.data.length} items with threshold ${req.threshold}`
          );

          if (req.threshold < 0) {
            yield* Effect.logError("Worker received invalid threshold");
            // To fail with a typed error, we return it wrapped in Effect.fail
            // (or just return the error instance directly here
            // thanks to TaggedError being yieldable)
            return yield* new FilterError({
              message: "Threshold cannot be negative"
            });
          }

          const filtered = req.data.filter((n) => n > req.threshold);

          yield* Effect.logInfo(
            `Worker finished filtering. Returning ${filtered.length} items.`
          );

          return filtered;
        }),

      // Handler for 'calculatePrimes'
      calculatePrimes: ({ upperBound }) =>
        Effect.gen(function* () {
          yield* Effect.logInfo(
            `Worker received request to calculate primes up to ${upperBound}`
          );

          let count = 0;
          for (let i = 2; i <= upperBound; i++) {
            if (isPrime(i)) {
              count += 1;
            }
          }

          yield* Effect.logInfo(
            `Worker finished calculating primes. Found ${count} primes.`
          );

          return count;
        })
    };
  })
);

// Define our RPC Server Configuration
const RpcWorkerServer = RpcServer.layer(WorkerRpc).pipe(
  // Plug in our actual implementations
  Layer.provide(Live),
  // Tell the server to use the standard Worker communication channel
  Layer.provide(RpcServer.layerProtocolWorkerRunner),
  // Provide the browser-specific parts needed to run in a worker context
  Layer.provide(BrowserWorkerRunner.layer)
);

// Start the worker runtime
BrowserRuntime.runMain(
  // BrowserWorkerRunner.launch sets everything up and handles cleanup
  BrowserWorkerRunner.launch(RpcWorkerServer).pipe(
    Effect.tapErrorCause((error) => Effect.logError("[Worker] crashed", error))
  )
);

Let’s break down the worker setup:

  1. Handlers (ImplementationLive): We use WorkerRpc.toLayer(...) to create a Layer that provides the implementations for our defined RPCs (FilterData, Test). The structure mirrors the RpcGroup definition.
  2. RPC Server Configuration (RpcWorkerServer):
    • RpcServer.layer(WorkerRpc): Creates the core server logic tied to our contract.
    • RpcServer.layerProtocolWorkerRunner: This is the magic piece. It tells the RpcServer to listen for and respond to messages using the standard worker postMessage/onmessage mechanism, mediated by @effect/platform/WorkerRunner.
    • BrowserWorkerRunner.layer: The layerProtocolWorkerRunner needs a PlatformRunner service, which this provides for the browser context. This means layerProtocolWorkerRunner is platform-agnostic. You can support any runner you want (e.g. Node.js, Bun, etc.) with just one line of code.
  3. Running the Worker: BrowserWorkerRunner.launch(RpcWorkerServer) starts the server layer and integrates it with the worker’s lifecycle, ensuring proper setup and teardown.

Our worker is now a fully functional, type-safe RPC server, ready to receive requests.

Step 3: Calling the Worker (The Client Side)

Okay, back on the main thread (your UI code, perhaps), we set up an RpcClient to talk to the worker. The structure is quite similar to the server setup.

import * as BrowserWorker from "@effect/platform-browser/BrowserWorker";
import * as RpcClient from "@effect/rpc/RpcClient";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import { WorkerRpc } from "./worker-contract"; // Use the same contract!
// How you import the worker file depends on your bundler (Vite, Webpack, etc.)
import MyWorker from "./worker.ts?worker";

// Configure the RPC Client Layer to use the Worker protocol
const RpcProtocol = RpcClient.layerProtocolWorker({
  // How many worker instances should we manage in our pool?
  size: 1,
  // How many requests can one worker handle at the same time?
  // (Usually 1 for CPU tasks)
  concurrency: 1
}).pipe(
  // Tell the client *how* to create a new Worker instance when needed
  Layer.provide(BrowserWorker.layerPlatform(() => new MyWorker())),
  // If setting up the protocol fails, it's a critical error
  Layer.orDie
);

// Define an Effect Service to make the client easily injectable and testable
export class WorkerClient extends Effect.Service<WorkerClient>()(
  "@org/WorkerClient",
  {
    // This service depends on the RpcProtocol layer
    dependencies: [RpcProtocol],
    scoped: Effect.gen(function* () {
      // Create the client instance using our contract.
      // It needs the RpcProtocol layer.
      const client = yield* RpcClient.make(WorkerRpc);
      // Expose the client through this service
      return {
        client: client
      };
    })
  }
) {}

Client setup mirrors the server:

  1. Protocol (RpcProtocol):
    • RpcClient.layerProtocolWorker({ size: 1 }): Configures the client to use the worker protocol. We set size: 1 for now, meaning it will manage one worker instance.
    • BrowserWorker.layerPlatform(...): Provides the mechanism to spawn the actual browser Worker.
  2. Service (WorkerClient): A standard Effect service pattern.
    • RpcClient.make(WorkerRpc): Creates the type-safe client instance based on our shared contract. This effect requires the RpcProtocol layer (provided via Effect.provide) and a Scope (implicitly provided by Layer.scoped).
    • The service simply exposes the created client.

Step 4: Using the Client

This is where the setup pays off. Actually using the worker from your main application code becomes clean, type-safe, and feels just like any other Effect operation.

import * as Effect from "effect/Effect";
import { WorkerClient } from "./worker-service"; // Import our service

const program = Effect.gen(function* () {
  // Access the RPC client via the service
  const { client } = yield* WorkerClient;

  const largeData = Array.from({ length: 1_000_000 }, (_, i) => i);
  const filterThreshold = 999_990;

  yield* Effect.logInfo(
    `Sending ${largeData.length} items to worker for filtering...`
  );

  // Call the worker RPC like a regular function!
  const result = yield* client
    .filterData({
      data: largeData,
      threshold: filterThreshold
    })
    .pipe(
      // Standard Effect error handling applies
      Effect.catchTag("FilterError", () => Effect.succeed([]))
    );

  yield* Effect.logInfo(
    `Received ${result.length} filtered items from worker.`
  );

  return result;
});

// Run the program, providing the WorkerClient layer
program
  .pipe(
    Effect.provide(WorkerClient.Default),
    // Or runFork, runSync, etc.
    Effect.runPromise
  )
  .then((finalResult) => {
    console.log("Final filtered result count:", finalResult.length);
  });

Look how clean that is!

  • client.filterData(...) returns an Effect.Effect<number[], FilterError>. Type safety is enforced end-to-end.
  • We use standard Effect combinators like pipe, catchTag, etc., to handle results and errors.
  • There’s no manual message creation, posting, listening, or ID correlation. Effect RPC handles the underlying complexity.

Scaling Up: Worker Pools Made Easy

Our current setup uses RpcClient.layerProtocolWorker({ size: 1 }), meaning the client manages a single worker instance. What if we have computationally intensive tasks and want to run multiple in parallel? Or perhaps different parts of our application need to perform distinct, heavy synchronous tasks off the main thread simultaneously? Running everything through one worker becomes a bottleneck.

This is where worker pools come in. The RpcClient.layerProtocolWorker layer is built on top of Effect’s core Pool abstraction (@effect/Pool). We can easily configure it to manage multiple worker instances.

Let’s change our client setup to use a pool of 5 workers:

// Configure the RPC Client Protocol Layer for a pool of 5 workers
const RpcProtocol = RpcClient.layerProtocolWorker({
  // *** Configure the pool size ***
  size: 5
  // Optional: Control concurrency per worker instance (defaults to 1)
  // concurrency: 2,
  // Optional: Configure time-to-live for idle workers in dynamic pools
  // minSize: 2, maxSize: 10, timeToLive: "60 seconds"
}).pipe(
  Layer.provide(BrowserWorker.layerPlatform(() => new MyWorker())),
  Layer.orDie
);

That’s it! By simply changing { size: 1 } to { size: 5 }, our RpcClient now manages a pool of five worker instances.

How does this work internally?

The RpcClient.layerProtocolWorker uses Pool.make (or Pool.makeWithTTL for dynamic pools) from the effect/Pool module. This Pool is a generic, powerful primitive in Effect for managing pools of any acquired resource that needs lifecycle management (like database connections, network sockets, or, in our case, worker instances).

Here’s a simplified view of what happens when you use the client backed by this pool:

  1. client.filterData(...) is called: The RpcClient needs a worker to send the request to.
  2. pool.get: The client asks the Pool for a worker instance via pool.get. The Pool manages the available worker instances.
    • If an idle worker is available in the pool, the Pool returns it immediately.
    • If all workers are busy (i.e., currently handling other requests up to their configured concurrency), pool.get will wait (semantically block the fiber) until one becomes available.
    • If the pool hasn’t reached its configured size (or maxSize) and demand is high, the Pool might decide to acquire (spawn) a new worker instance using the acquire effect provided during setup (which layerProtocolWorker configures internally to use Worker.spawn).
  3. Scoped Usage: Crucially, pool.get returns an Effect.Effect<WorkerInstance, E, Scope.Scope>. This means the acquired worker is tied to a Scope. When the Effect that uses this worker (i.e., the effect representing your client.filterData call, including sending the request and receiving the response) completes (either successfully, with failure, or via interruption), the Scope is closed.
  4. Release: As part of the Scope finalization managed by the Pool, the worker instance is automatically released back to the pool, making it available for the next pool.get request. This ensures resources are never leaked and are efficiently reused.

The effect/Pool implementation handles all the complexities of tracking item usage, managing minimum/maximum sizes, handling idle timeouts (timeToLive), and ensuring safe acquisition and release through the Scope mechanism. RpcClient.layerProtocolWorker leverages this robust primitive, providing a simple configuration interface ({ size: ... }) while benefiting from the power and safety of the underlying Pool.

You can even manually invalidate a specific worker instance obtained from the pool if you detect it’s behaving incorrectly, causing the pool to eventually discard and replace it.

By using a pool, your application can now handle multiple concurrent requests to the worker logic, distributing the load across the available worker instances automatically, without you needing to manage the individual workers or their lifecycles manually.

Conclusion

Workers are great for performance, but the standard API can be a drag to work with, leading to boilerplate and runtime errors. Effect RPC offers a much smoother, safer, and more productive path.

By defining a clear contract with Schema, using the RpcServer and RpcClient tailored for workers, and leveraging Effect’s built-in pooling, you get:

  • End-to-End Type Safety: Eliminate runtime errors caused by mismatched message types or serialization issues.
  • Simplified Communication: Interact with workers using familiar Effect patterns (Effect.gen, pipe, catchTag) instead of manual postMessage.
  • Automatic Error Propagation: Worker errors are surfaced naturally within the client Effect’s error channel.
  • Built-in Pooling: Easily scale worker usage by configuring RpcClient.layerProtocolWorker to manage a pool of instances, powered by the robust effect/Pool primitive.
  • Composability: Integrate worker calls seamlessly into larger Effect workflows.
  • Platform-Agnostic: Swap out the worker runner for a different platform (e.g. Node.js, Bun, etc.) with a single line of code.

It takes a little more setup than raw postMessage, sure, but the payoff in robustness, maintainability, and reduced debugging time is well worth it, especially as your application grows. Effect RPC lets you focus on what your worker needs to do, not how it talks to the main thread. Give it a try on your next project!