Avatar

Francisco Moschetti

Uruguay.
Web developer alergic to leetcode.

Back to blog

Discriminated unions with zod.

I’m currently sketching some of the core logic for a product I have in mind called inboxrobots, that allows you to create AI agents that have an email inbox (think [email protected], [email protected]).

One of the core mechanics I need to have in place is a way to define RAG sources that agents can use to write helpful responses.

The thing is, you need to define multiple types of sources, with different kinds of input data, and retrieving logic.

I want to have a unique table in my DB for the source definition, and ideally I don’t want to make a change to it everytime I want add a new source type.

The db structure I went with is this:


export const sourceDefinitionTable = sqliteTable("source", {
  id: integer("id").primaryKey({ autoIncrement: true }),
  organizationId: text("organization_id")
    .notNull()
    .references(() => organization.id, { onDelete: "cascade" }),
  mailboxId: integer("mailbox_id")
    .notNull()
    .references(() => mailboxesTable.id, { onDelete: "cascade" }),
  kind: text({ enum: ["text_input", "file", "url", "sitemap"] }).notNull(),
  sourceConfig: text({ mode: "json" }).notNull(),
  createdAt: integer('created_at', { mode: 'timestamp' })
    .default(sql`(unixepoch())`)
    .notNull(),
  updatedAt: integer('updated_at', { mode: 'timestamp' })
    .default(sql`(unixepoch())`)
    .notNull(),
})
  

The “kind” attribute allows me to set up filters and queries based on source type without looking at the source config.

The “sourceConfig” field is just a text field that stores a json object.

This gives me the flexibility to add new sources and edit existing source input requirements without having to do a db migration.

There is a problem tho, how do you make sure the json content has the right format fo the kind, and validate when reading it form your db?

Enter zod

Zod lets us define a flexible source config schema that changes the required fields based on kind.

import { z } from "zod";

export const baseSourceConfigSchema = z.object({
  name: z.string(),
  description: z.string().optional(),
});

export const textInputConfigSchema = baseSourceConfigSchema.extend({
  content: z.string(),
});

export const fileConfigSchema = baseSourceConfigSchema.extend({
  fileUrl: z.string().url(),
  mimeType: z.string(),
  fileName: z.string(),
});

export const urlConfigSchema = baseSourceConfigSchema.extend({
  url: z.string().url(),
  crawlDepth: z.number().default(0),
  selectors: z.array(z.string()).optional(),
});

export const sitemapConfigSchema = baseSourceConfigSchema.extend({
  sitemapUrl: z.string().url(),
  includePatterns: z.array(z.string()).optional(),
  excludePatterns: z.array(z.string()).optional(),
});

// Discriminated union for all configs
export const sourceConfigSchema = z.discriminatedUnion("kind", [
  z.object({ kind: z.literal("text_input"), config: textInputConfigSchema }),
  z.object({ kind: z.literal("file"), config: fileConfigSchema }),
  z.object({ kind: z.literal("url"), config: urlConfigSchema }),
  z.object({ kind: z.literal("sitemap"), config: sitemapConfigSchema }),
]);

export type SourceConfig = z.infer<typeof sourceConfigSchema>;
export type SourceKind = SourceConfig["kind"]; 

Using the sources in my app.

For my use case, there is two things i must do with my source definitons:

    1. Turn the source definition into indexable content: This can entail anything from reading a pdf file, to scraping a website.
    1. Turn the text of the doc into indexable chunks.

These two tasks are the core of my source handling logic.

I want to have a unique Retriever class for each source, so I can isolate the required logic for each source in its own implementation.

The right retriever class will be passed to a main indexer class that will do the chunking and embedding gen.

Here is the factory for my source retriever strategy:

import type { SourceConfig } from "../../sources/config";
import { FileRetrieverStrategy } from "./file.strategy";
import { UrlRetrieverStrategy } from "./url.strategy";
import { TextRetrieverStrategy } from "./text.strategy";
import { SitemapRetrieverStrategy } from "./sitemap.strategy";

export interface SourceDocument {
  content: string;
  metadata: Record<string, any>;
}

export interface RetrieverStrategy {
  retrieve(): Promise<SourceDocument[]>
}

export class RetrieverFactory {
  static create(config: SourceConfig) {
    switch (config.kind) {
      case "file":
        return new FileRetrieverStrategy(config)
      case "url":
        return new UrlRetrieverStrategy(config)
      case "text_input":
        return new TextRetrieverStrategy(config)
      case "sitemap":
        return new SitemapRetrieverStrategy(config)
    }
  }
}  

Here is the indexer that uses the retriever:

import type { SourceConfig } from "../sources/config";
import type { RetrieverStrategy } from "./retrievers";

export interface SourceChunk {
  content: string;
  metadata: Record<string, any>;
  sourceId: number;
  startIndex: number;
  endIndex: number;
}

export interface ChunkOptions {
  chunkSize?: number;
  chunkOverlap?: number;
  separator?: string;
}

export class SourceIndexer {

  constructor(private config: SourceConfig, private retriever: RetrieverStrategy) { }

  async split(options: ChunkOptions): Promise<SourceChunk> {
    // ...
  }

  generateChunkEmbeddings(chunk: SourceChunk) {
    // ...
  }

}

Putting it all together

Those core pieces of logic can now be put to work in my source service, under the processSource method of my SourceService class:

import type { ISourceStore } from "$lib/server/stores/sources.store"
import { SourceIndexer } from "../embeddings"
import { RetrieverFactory } from "../embeddings/retrievers"
import { sourceConfigSchema, type SourceConfig } from "./config"

export class BadConfigSchemaError extends Error {
  constructor() {
    super("Source config has an invalid schema.")
    this.name = "BadConfigSchema"
  }
}

export class SourceService {
  constructor(private store: ISourceStore) { }

  private async parseSourceConfig(sourceDefinitionId: number): Promise<SourceConfig> {
    const sourceDefinition = await this.store.getById(sourceDefinitionId)
    const rawConfig = JSON.parse(sourceDefinition.sourceConfig as string)
    const parsedConfig = sourceConfigSchema.safeParse(rawConfig)
    if (!parsedConfig.success) {
      throw new BadConfigSchemaError()
    }
    return parsedConfig.data
  }

  async indexSourceSketch(originalSourceId: number) {
    const config = await this.parseSourceConfig(originalSourceId)
    const retriever = RetrieverFactory.create(config)
    const indexer = new SourceIndexer(config, retriever)
    const sourceChunks = indexer.split({ chunkSize: 500 })

    // ...
  }
}
  

Conclusion

Most of the difficulty of this use case is solved at the time of defining our data structure, so we can easily create the needed retriever class for each source kind, having isolated and testeable logic for each.

Using zod as the source of truth here lets us have all the benefits of storing raw JSON without any of the problems. This product will use Sveltekit, so i can easily validate the user submitted forms with the source zod schema in my form server function.