person_distinct_id table makes for an interesting case study on how initial schema design flaws were exposed over time and how they were fixed.

Problem being solved

PostHog needs to know who are the users associated with each event.

In frontend libraries like posthog-js, when persons land on a site they're initially anonymous with a random distinct ID. As persons log in or sign up, posthog.identify should be called to signal that the anonymous person is actually some logged in person and their prior events should be grouped together.

The semantics of this have changed significantly with person-on-events project.

Schema

SQL
CREATE TABLE person_distinct_id
(
distinct_id VARCHAR,
person_id UUID,
team_id Int64,
_sign Int8 DEFAULT 1,
is_deleted Int8 ALIAS if(_sign==-1, 1, 0),
_timestamp DateTime,
_offset UInt64
) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/noshard/posthog.person_distinct_id', '{replica}-{shard}', _sign)
ORDER BY (team_id, distinct_id, person_id)

The is_deleted column is not actually being written to, it is dynamically calculated based on the _sign column.

This table was queried often joined with events table along the following lines:

SQL
SELECT avg(count())
FROM events
INNER JOIN (
SELECT distinct_id, argMax(person_id, _timestamp) as person_id
FROM (
SELECT distinct_id, person_id, max(_timestamp) as _timestamp
FROM person_distinct_id
WHERE team_id = 2
GROUP BY person_id, distinct_id, team_id
HAVING max(is_deleted) = 0
)
GROUP BY distinct_id
) AS pdi ON (pdi.distinct_id = events.distinct_id)
WHERE team_id = 2
GROUP BY pdi.person_id

Design decision: no sharding

Since this table was almost always joined against the events table, this table was not sharded.

Sharding it means that each shard would need to send back all the events and person_distinct_id sub-query result rows to coordinator node to execute queries, which would be expensive and slow.

Design decision: CollapsingMergeTree

The given distinct_id belonging to a person can change over time as posthog.identify or posthog.alias are called.

For this reason the data needs to be constantly updated, yet updating data in ClickHouse requires rewriting large chunks of data.

Rather than rewriting data, we opted to use CollapsingMergeTree. CollapsingMergeTree adds special behavior to ClickHouse merge operation: if on a merge rows with identical ORDER BY values are seen, they are collapsed according to _sign column:

  • If sum of signs _sign was positive, new row has _sign of 1.
  • Otherwise, the row was removed.

This was used to update-via-insert:

  • On a change, the old person_id was discarded via emitting a row _sign of -1
  • On a change, the new person_id row was emitted with _sign of 1
  • At query-time, the resulting rows were aggregated together to find the current state of the world

Due to this logic, both person_id and distinct_id needed to be in the ORDER BY key.

Problem: CollapsingMergeTree for updates

CollapsingMergeTree is not ideal for frequently updating a single row as merges occur in an non-deterministic order and that will cause trouble if subsequent rows signifying deletes get discarded before being merged with an "insert" row.

When updating columns ReplacingMergeTree engine tables with an explicit version column has proven to be reliable.

Problem: Expensive queries

In December 2021, PostHog started seeing significant performance problems and out-of-memory errors due to this schema for largest users.

The problem was two-fold:

  • JOINs are inherently expensive in ClickHouse as the right-hand-side of the join (person_distinct_id subquery) would be loaded into memory
  • The schema was inefficient, emitting multiple rows per person and requiring post-aggregation

Improved schema

To fix both problems, a new table was created:

SQL
CREATE TABLE person_distinct_id2
(
team_id Int64,
distinct_id VARCHAR,
person_id UUID,
is_deleted Int8,
version Int64 DEFAULT 1,
_timestamp DateTime,
_offset UInt64,
_partition UInt64
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/noshard/posthog.person_distinct_id2', '{replica}-{shard}', version)
ORDER BY (team_id, distinct_id)
SETTINGS index_granularity = 512

JOINs with this table look something like this:

SQL
SELECT avg(count())
FROM events
INNER JOIN (
SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 2
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0
) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 2
GROUP BY pdi.person_id

This schema:

  • Was over 2x faster to query for large teams while requiring less memory
  • Had explicit versioning logic built in
  • Required fewer Kafka messages and traffic
  • Lowered index_granularity for faster point queries
  • Leveraged ReplacingMergeTree to ensure data consistency

Closing notes

Even with improvements JOINs still are expensive and after the person-on-events project we were able to store person_id column on the events table to great effect.

Questions?

Was this page useful?

Next article

Shipping & releasing

Any process is a balance between speed and control. If we have a long process that requires extensive QA and 10 approvals, we will never make mistakes because we will never release anything. However, if we have no checks in place, we will release quickly but everything will be broken. For this reason we have some best practices for releasing things, and guidelines on how to ship. How to decide what to build Set milestones To start, Product and Engineering should align on major milestones (e.g…

Read next article