How/Why to Sweep Async Tasks Under a Postgres Table

✨ Check out this awesome post from Hacker News 📖

📂 Category:

📌 Main takeaway:

I like slim and stupid servers, where each endpoint wraps a very dumb DB query.

Dumb queries are fast. Fast queries make websites smooth and snappy. Keep those
click/render loops sacred.

Sweep complexity under a task table:

router.post("/signup", async ctx => {
  const 💬 = await ctx.request.body().value;
  const [What do you think? = 💬] = await sql`
    with usr_ as (
      insert into usr (email, password)
      values ($Tell us your thoughts in comments!, crypt($🔥, gen_salt('bf')))
      returning *
    ), task_ as (
      insert into task (task_type, params)
      values ('SEND_EMAIL_WELCOME', $⚡)
    )
    select * from usr_
  `;
  await ctx.cookies.set("usr_id", usr_id);
  ctx.response.status = 204;
});

Of course using mailgun.send is easier than queuing it in a task table.
Adding indirection rarely makes systems less complex. But somehow I’m here to
advocate exactly that. You may ignore my manifesto and
skip to my implementation at the end.

Secret Surface Error Area

Customers don’t care about cosmic rays. They want a thing. More imporantly, they
want immediate confirmation of their thing. They want to offload the mental
burden of their goal.

For them to delegate that responsibility, your DB is probably the only thing
that matters. Once information is committed to your database, you can
confidently say “we’ll take it from here”.

You can send emails later. You can process payments later. You can do almost
anything later. Just tell your customer they can continue with their goddamn
day.

Delight your customers with clear feedback.

Delight your computers by writing to one place at a time.

Never Handroll Your Own Two-Phase Commit

Writing to two places at “the same time” is sinful.

When the gods gave us computer storage, the people became unhappy. They cried,
“What is consistency? Where are our guarantees? Why must I fsync?” And so they
wore sackloth and ashes for many years in their coding caves.

The people were overjoyed when the gods scrawled Postgres (and other inferior
databases) onto stone tablets. The holy “database transactions” allowed
humankind to pretend that they could read/write to multiple places at the same
time.

To this day, databases sometimes work.

But some developers deny the works of the gods. They mix multiple tools, and so
commit the sin of writing to multiple places.

“Oh, we’ll just send a pubsub message after we insert the row.” But data is
lost. Message before insert row? Data lost. All blasphemers are doomed to
reinvent two-phase commit.

One Way To Do Things

I like LEGO. I like Play-Doh. I like Lincoln Logs. I do not, however, like
mixing them together.

It’s painful to investigate systems when state is spread across SQS, Redis,
PubSub, Celery, Airflow, etc. I shouldn’t have to open a local detective agency
find out why a process isn’t running as expected.

Most modern projects use SQL. Because I dislike mixing systems, I try to take
SQL as far as possible.

Of all the SQL databases, Postgres currently offers the best mix of modern
first-class features and third-party extensions. Postgres can be your knock-off
Kafka, artificial Airflow, crappy Clickhouse, nasty Elasticsearch, poor man’s
PubSub, on-sale Celery, etc.

Sure, Postgres doesn’t have all the fancy features of each specialized system.
But colocating queue/pipeline/async data in your main database eliminates swaths
of errors. In my experience, transaction guarantees supercede everything else.

TODO-Driven Development

while (true) {
  // const rows = await ...
  for (const Share your opinion below! of rows)
    if (task_type in tasks) {
      await tasks[task_type](tx, params);
    } else {
      console.error(`Task type not implemented: ${task_type}`);
    }
}

With a simple retry system, asynchronous decoupling magically tracks all your
incomplete flows.

No need to rely upon Jira — bugs and unimplemented tasks will be logged and
retried. Working recursively from error queues is truly a wonderful experience.
All your live/urgent TODOs are printed to the same place (in development and in
production).

With this paradigm, you’ll gravitate towards scalable pipelines.
Wishful thinking makes natural
architecture.

Human Fault Tolerance

Many systems foist useless retry-loops onto humans.

Humans should receive feedback for human errors. But humans should not receive
feedback for problems that can be handled by computers (and their software
developers).

Remember, all your retry-loops have to happen somewhere. Be careful what you
delegate to customers and developers. Your business’s bottom-line is bounded by
human patience; computers have infinitely more patience than humans.

Show Me The Code

Here’s the task table:

create table task
( task_id bigint primary key not null generated always as identity
, task_type text not null -- consider using enum
, params jsonb not null -- hstore also viable
, created_at timestamptz not null default now()
, unique (task_type, params) -- optional, for pseudo-idempotency
)

Here’s the code for the task worker:

const tasks = {
  SEND_EMAIL_WELCOME: async (tx, params) => {
    const { email } = params;
    if (!email) throw new Error(`Bad params ${JSON.stringify(params)}.`);
    await sendEmail({ email, body: "WELCOME" });
  },
};

(async () => {
  while (true) {
    try {
      while (true) {
        await sql.begin(async (tx: any) => {
          const rows = await tx`
            delete from task
            where task_id in
            ( select task_id
              from task
              order by random() -- use tablesample for better performance
              for update
              skip locked
              limit 1
            )
            returning task_id, task_type, params::jsonb as params
          `;
          for (const { task_type, params } of rows)
            if (task_type in tasks) {
              await tasks[task_type](tx, params);
            } else {
              throw new Error(`Task type not implemented: ${task_type}`);
            }
          if (rows.length <= 0) {
            await delay(10 * 1000);
          }
        });
      }
    } catch (err) {
      console.error(err);
      await delay(1 * 1000);
    }
  }
})();

A few notable features of this snippet:

  • The task row will not be deleted if sendEmail fails. The PG transaction
    will be rolled back. The row and sendEmail will be retried.
  • The PG transaction tx is passed along to tasks. This is convenient for
    marking rows as “processed”, etc.
  • Transactions make error-handling so much nicer. Always organize reversible
    queries before irreversible side-effects (e.g. mark DB status before sending
    the email). Remember that the DB commits at the end.
  • Because of skip locked, you can run any number of these workers in parallel.
    They will not step on each others’ toes.
  • Random ordering is technically optional, but it makes the system more
    resilient to errors. With adequate randomness, a single task type cannot block
    the queue for all.
  • Use order by (case task_type ... end), random() to create an easy
    prioritized queue.
  • Limiting number of retries makes the code more complicated, but definitely
    worth it for user-facing side-effects like emails.
  • if (rows.length <= 0) prevents overzealous polling. Your DBA will be
    grateful.

{💬|⚡|🔥} {What do you think?|Share your opinion below!|Tell us your thoughts in comments!}

#️⃣ #HowWhy #Sweep #Async #Tasks #Postgres #Table

🕒 Posted on 1763754443

By

Leave a Reply

Your email address will not be published. Required fields are marked *