On this page

ClickHouse is our main analytics backend.

Instead of data being inserted directly into ClickHouse, it itself pulls data from Kafka. This makes our ingestion pipeline more resilient towards outages.

The following sections go more into depth in how this works exactly.

Events

In order to make PostHog easy to scale, we use a sharded ClickHouse setup.

clickhouse_events_json topic
reads from
pushes data to
pushes data to
reads from
kafka_events table
(Kafka table engine)
events_mv table
(Materialized view)
writable_events table
(Distributed table engine)
events table
(Distributed table engine)
sharded_events table
(ReplicatedReplacingMergeTree table engine)
Kafka

kafka_events table

kafka_events table uses the Kafka table engine

Tables using this engine set up Kafka consumers that consume data on read queries to the table, advancing the offset for the consumer group in Kafka.

events_mv Materialized View

events_mv table is a Materialized View.

In this case it acts as a data pipe which periodically pulls data from kafka_events and pushes it into the target (events) table.

writable_events table

writable_events table uses the Distributed table engine.

The schema looks something like as follows:

SQL
CREATE TABLE posthog.writable_events (
`uuid` UUID,
`event` String,
`properties` String,
`timestamp` DateTime64(6, 'UTC'),
`team_id` Int64,
`distinct_id` String,
`elements_hash` String,
`created_at` DateTime64(6, 'UTC'),
`_timestamp` DateTime,
`_offset` UInt64,
`elements_chain` String
) ENGINE = Distributed('posthog', 'posthog', 'sharded_events', sipHash64(distinct_id))

This table:

  • Gets pushed rows from events_mv table.
  • For every row, it calculates a hash based on the distinct_id column.
  • Based on the hash, sends the row to the right shard on the posthog cluster into the posthog.sharded_events table.
  • Does not contain materialized columns as they would hinder INSERT queries.

sharded_events table

sharded_events table uses the ReplicatedReplacingMergeTree.

This table:

  • Stores the event data.
  • Is sharded and replicated.
  • Is queried indirectly via the events table.

events table

Similar to writable_events, the events table uses the Distributed table engine.

This table is being queried from app and for every query, figures out what shard(s) to query and aggregates the results from shards.

Note that even though the ReplacingMergeTree engine is used, we should avoid writing duplicate data into the table, as deduplication is not a guarantee.

Persons

The source of truth for person info and person to distinct_id mappings is in PostgreSQL, but to speed up queries we replicate it to ClickHouse. Both tables use the ReplacingMergeTree and collapse by the version column, which is incremented every time a person is updated.

Note that querying both tables requires handling duplicated rows. Check out PersonQuery code for an example of how it's done.

In sharded setups, person and person_distinct_id tables are not sharded and instead replicated onto each node to avoid JOINs over the network.

Questions?

Was this page useful?

Next article

Querying data

This page provides a high-level overview of how queries are run when creating insights in PostHog. Note: This page does not cover all the intricacies of how queries are run in PostHog. Insights counting unique persons This section covers how PostHog determines the number of unique users who performed a certain action, such as when creating a Trends or Funnel insight. In this case, PostHog determines this number by counting the total number of unique person_id 's on events that match your…

Read next article