Ingest data from Google Cloud SQL
This page shows you how to stream data from Google Cloud SQL for PostgreSQL to Materialize using thePostgreSQL source.
Before you begin
-
Make sure you are running PostgreSQL 11 or higher.
-
Make sure you have access to your PostgreSQL instance via
psql
, or your preferred SQL client.
A. Configure Google Cloud SQL
1. Enable logical replication
Materialize uses PostgreSQL’s logical replication protocol to track changes in your database and propagate them to Materialize.
To enable logical replication in Cloud SQL, see the Cloud SQL documentation.
2. Create a publication and a replication user
Once logical replication is enabled, the next step is to create a publication with the tables that you want to replicate to Materialize. You’ll also need a user for Materialize with sufficient privileges to manage replication.
-
For each table that you want to replicate to Materialize, set the replica identity to
FULL
:ALTER TABLE <table1> REPLICA IDENTITY FULL;
ALTER TABLE <table2> REPLICA IDENTITY FULL;
REPLICA IDENTITY FULL
ensures that the replication stream includes the previous data of changed rows, in the case ofUPDATE
andDELETE
operations. This setting enables Materialize to ingest PostgreSQL data with minimal in-memory state. However, you should expect increased disk usage in your PostgreSQL database. -
Create a publication with the tables you want to replicate:
For specific tables:
CREATE PUBLICATION mz_source FOR TABLE <table1>, <table2>;
For all tables in the database:
CREATE PUBLICATION mz_source FOR ALL TABLES;
The
mz_source
publication will contain the set of change events generated from the specified tables, and will later be used to ingest the replication stream.Be sure to include only the tables you need. If the publication includes additional tables, Materialize will waste resources on ingesting and then immediately discarding the data.
-
Create a user for Materialize, if you don’t already have one:
CREATE USER materialize PASSWORD '<password>';
-
Grant the user permission to manage replication:
ALTER ROLE materialize WITH REPLICATION;
-
Grant the user the required permissions on the tables you want to replicate:
GRANT CONNECT ON DATABASE <dbname> TO materialize;
GRANT USAGE ON SCHEMA <schema> TO materialize;
GRANT SELECT ON <table1> TO materialize;
GRANT SELECT ON <table2> TO materialize;
Once connected to your database, Materialize will take an initial snapshot of the tables in your publication.
SELECT
privileges are required for this initial snapshot.If you expect to add tables to your publication, you can grant
SELECT
on all tables in the schema instead of naming the specific tables:GRANT SELECT ON ALL TABLES IN SCHEMA <schema> TO materialize;
B. (Optional) Configure network security
There are various ways to configure your database’s network to allow Materialize to connect:
-
Allow Materialize IPs: If your database is publicly accessible, you can configure your database’s firewall to allow connections from a set of static Materialize IP addresses.
-
Use an SSH tunnel: If your database is running in a private network, you can use an SSH tunnel to connect Materialize to the database.
Select the option that works best for you.
-
In the Materialize console’s SQL Shell, or your preferred SQL client connected to Materialize, find the static egress IP addresses for the Materialize region you are running in:
SELECT * FROM mz_egress_ips;
-
Update your Google Cloud SQL firewall rules to allow traffic from each IP address from the previous step.
To create an SSH tunnel from Materialize to your database, you launch an instance to serve as an SSH bastion host, configure the bastion host to allow traffic only from Materialize, and then configure your database’s private network to allow traffic from the bastion host.
-
Launch a GCE instance to serve as your SSH bastion host.
- Make sure the instance is publicly accessible and in the same VPC as your database.
- Add a key pair and note the username. You’ll use this username when connecting Materialize to your bastion host.
- Make sure the VM has a static public IP address. You’ll use this IP address when connecting Materialize to your bastion host.
-
Configure the SSH bastion host to allow traffic only from Materialize.
-
In the Materialize console’s SQL Shell, or your preferred SQL client connected to Materialize, get the static egress IP addresses for the Materialize region you are running in:
SELECT * FROM mz_egress_ips;
-
Update your SSH bastion host’s firewall rules to allow traffic from each IP address from the previous step.
-
-
Update your Google Cloud SQL firewall rules to allow traffic from the SSH bastion host.
C. Ingest data in Materialize
1. (Optional) Create a cluster
quickstart
), you can skip this step. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to external clients, or a combination.
In this step, you’ll create a dedicated cluster for ingesting source data from your PostgreSQL database.
-
In the SQL Shell, or your preferred SQL client connected to Materialize, use the
CREATE CLUSTER
command to create the new cluster:CREATE CLUSTER ingest_postgres (SIZE = '50cc'); SET CLUSTER = ingest_postgres;
A cluster of size
50cc
should be enough to accommodate multiple PostgreSQL sources, depending on the source characteristics (e.g., sources withENVELOPE UPSERT
orENVELOPE DEBEZIUM
will be more memory-intensive) and the upstream traffic patterns. You can readjust the size of the cluster at any time using theALTER CLUSTER
command:ALTER CLUSTER <cluster_name> SET ( SIZE = <new_size> );
2. Start ingesting data
Now that you’ve configured your database network and created an ingestion cluster, you can connect Materialize to your PostgreSQL database and start ingesting data. The exact steps depend on your networking configuration, so start by selecting the relevant option.
-
In a SQL client connected to Materialize, use the
CREATE SECRET
command to securely store the password for thematerialize
PostgreSQL user you created earlier:CREATE SECRET pgpass AS '<PASSWORD>';
-
Use the
CREATE CONNECTION
command to create a connection object with access and authentication details for Materialize to use:CREATE CONNECTION pg_connection TO POSTGRES ( HOST '<host>', PORT 5432, USER 'materialize', PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE '<database>' );
-
Replace
<host>
with your PostgreSQL endpoint. -
Replace
<database>
with the name of the database containing the tables you want to replicate to Materialize.
-
-
Use the
CREATE SOURCE
command to connect Materialize to your PostgreSQL instance and start ingesting data from the publication you created earlier:CREATE SOURCE mz_source IN CLUSTER ingest_postgres FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source') FOR ALL TABLES;
By default, the source will be created in the active cluster; to use a different cluster, use the
IN CLUSTER
clause. To ingest data from specific schemas or tables in your publication, useFOR SCHEMAS (<schema1>,<schema2>)
orFOR TABLES (<table1>, <table2>)
instead ofFOR ALL TABLES
. -
After source creation, you can handle upstream schema changes for specific replicated tables using the
ALTER SOURCE...ADD SUBSOURCE
andDROP SOURCE
syntax.
-
In the SQL client connected to Materialize, use the
CREATE CONNECTION
command to create an SSH tunnel connection:CREATE CONNECTION ssh_connection TO SSH TUNNEL ( HOST '<SSH_BASTION_HOST>', PORT <SSH_BASTION_PORT>, USER '<SSH_BASTION_USER>' );
- Replace
<SSH_BASTION_HOST>
and<SSH_BASTION_PORT
> with the public IP address and port of the SSH bastion host you created earlier. - Replace
<SSH_BASTION_USER>
with the username for the key pair you created for your SSH bastion host.
- Replace
-
Get Materialize’s public keys for the SSH tunnel connection:
SELECT * FROM mz_ssh_tunnel_connections;
-
Log in to your SSH bastion host and add Materialize’s public keys to the
authorized_keys
file, for example:# Command for Linux echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
-
Back in the SQL client connected to Materialize, validate the SSH tunnel connection you created using the
VALIDATE CONNECTION
command:VALIDATE CONNECTION ssh_connection;
If no validation error is returned, move to the next step.
-
Use the
CREATE SECRET
command to securely store the password for thematerialize
PostgreSQL user you created earlier:CREATE SECRET pgpass AS '<PASSWORD>';
-
Use the
CREATE CONNECTION
command to create another connection object, this time with database access and authentication details for Materialize to use:CREATE CONNECTION pg_connection TO POSTGRES ( HOST '<host>', PORT 5432, USER 'materialize', PASSWORD SECRET pgpass, DATABASE '<database>', SSH TUNNEL ssh_connection );
-
Replace
<host>
with your PostgreSQL endpoint. -
Replace
<database>
with the name of the database containing the tables you want to replicate to Materialize.
-
-
Use the
CREATE SOURCE
command to connect Materialize to your Azure instance and start ingesting data from the publication you created earlier:CREATE SOURCE mz_source IN CLUSTER ingest_postgres FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source') FOR ALL TABLES;
To ingest data from specific schemas or tables in your publication, use
FOR SCHEMAS (<schema1>,<schema2>)
orFOR TABLES (<table1>, <table2>)
instead ofFOR ALL TABLES
.
3. Monitor the ingestion status
Before it starts consuming the replication stream, Materialize takes a snapshot of the relevant tables in your publication. Until this snapshot is complete, Materialize won’t have the same view of your data as your PostgreSQL database.
In this step, you’ll first verify that the source is running and then check the status of the snapshotting process.
-
Back in the SQL client connected to Materialize, use the
mz_source_statuses
table to check the overall status of your source:WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT * FROM mz_internal.mz_source_statuses JOIN ( SELECT referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id FROM source_ids ) AS sources ON mz_source_statuses.id = sources.referenced_object_id;
For each
subsource
, make sure thestatus
isrunning
. If you seestalled
orfailed
, there’s likely a configuration issue for you to fix. Check theerror
field for details and fix the issue before moving on. Also, if thestatus
of any subsource isstarting
for more than a few minutes, contact our team. -
Once the source is running, use the
mz_source_statistics
table to check the status of the initial snapshot:WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT sources.referenced_object_id AS id, mz_sources.name, snapshot_committed FROM mz_internal.mz_source_statistics JOIN ( SELECT object_id, referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id, id FROM source_ids ) AS sources ON mz_source_statistics.id = sources.referenced_object_id JOIN mz_sources ON mz_sources.id = sources.referenced_object_id;
object_id | snapshot_committed ----------|------------------ u144 | t (1 row)
Once
snapshot_commited
ist
, move on to the next step. Snapshotting can take between a few minutes to several hours, depending on the size of your dataset and the size of the cluster the source is running in.
4. Right-size the cluster
After the snapshotting phase, Materialize starts ingesting change events from
the PostgreSQL replication stream. For this work, Materialize generally
performs well with an 100cc
replica, so you can resize the cluster
accordingly.
-
Still in a SQL client connected to Materialize, use the
ALTER CLUSTER
command to downsize the cluster to100cc
:ALTER CLUSTER ingest_postgres SET (SIZE '100cc');
Behind the scenes, this command adds a new
100cc
replica and removes the50cc
replica. -
Use the
SHOW CLUSTER REPLICAS
command to check the status of the new replica:SHOW CLUSTER REPLICAS WHERE cluster = 'ingest_postgres';
cluster | replica | size | ready -----------------+---------+--------+------- ingest_postgres | r1 | 100cc | t (1 row)
-
Going forward, you can verify that your new cluster size is sufficient as follows:
-
In Materialize, get the replication slot name associated with your PostgreSQL source from the
mz_internal.mz_postgres_sources
table:SELECT d.name AS database_name, n.name AS schema_name, s.name AS source_name, pgs.replication_slot FROM mz_sources AS s JOIN mz_internal.mz_postgres_sources AS pgs ON s.id = pgs.id JOIN mz_schemas AS n ON n.id = s.schema_id JOIN mz_databases AS d ON d.id = n.database_id;
-
In PostgreSQL, check the replication slot lag, using the replication slot name from the previous step:
SELECT pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) AS replication_lag_bytes FROM pg_replication_slots WHERE slot_name = '<slot_name>';
The result of this query is the amount of data your PostgreSQL cluster must retain in its replication log because of this replication slot. Typically, this means Materialize has not yet communicated back to PostgreSQL that it has committed this data. A high value can indicate that the source has fallen behind and that you might need to scale up your ingestion cluster.
-
D. Explore your data
With Materialize ingesting your PostgreSQL data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.
-
Explore your data with
SHOW SOURCES
andSELECT
. -
Compute real-time results in memory with
CREATE VIEW
andCREATE INDEX
or in durable storage withCREATE MATERIALIZED VIEW
. -
Serve results to a PostgreSQL-compatible SQL client or driver with
SELECT
orSUBSCRIBE
or to an external message broker withCREATE SINK
. -
Check out the tools and integrations supported by Materialize.
Considerations
Schema changes
Materialize supports schema changes in the upstream database as follows:
Compatible schema changes
-
Adding columns to tables. Materialize will not ingest new columns added upstream unless you use
DROP SOURCE
to first drop the affected subsource, and then add the table back to the source usingALTER SOURCE...ADD SUBSOURCE
. -
Dropping columns that were added after the source was created. These columns are never ingested, so you can drop them without issue.
-
Adding or removing
NOT NULL
constraints to tables that were nullable when the source was created.
Incompatible schema changes
All other schema changes to upstream tables will set the corresponding subsource into an error state, which prevents you from reading from the source.
To handle incompatible schema changes, use DROP SOURCE
and ALTER SOURCE...ADD SUBSOURCE
to first drop the
affected subsource, and then add the table back to the source. When you add the
subsource, it will have the updated schema from the corresponding upstream
table.
Publication membership
PostgreSQL’s logical replication API does not provide a signal when users remove tables from publications. Because of this, Materialize relies on periodic checks to determine if a table has been removed from a publication, at which time it generates an irrevocable error, preventing any values from being read from the table.
However, it is possible to remove a table from a publication and then re-add it before Materialize notices that the table was removed. In this case, Materialize can no longer provide any consistency guarantees about the data we present from the table and, unfortunately, is wholly unaware that this occurred.
To mitigate this issue, if you need to drop and re-add a table to a publication,
ensure that you remove the table/subsource from the source before re-adding it
using the DROP SOURCE
command.
Supported types
Materialize natively supports the following PostgreSQL types (including the array type for each of the types):
bool
bpchar
bytea
char
date
daterange
float4
float8
int2
int2vector
int4
int4range
int8
int8range
interval
json
jsonb
numeric
numrange
oid
text
time
timestamp
timestamptz
tsrange
tstzrange
uuid
varchar
Replicating tables that contain unsupported data types is
possible via the TEXT COLUMNS
option. The specified columns will be treated
as text
, and will thus not offer the expected PostgreSQL type features. For
example:
-
enum
: the implicit ordering of the original PostgreSQLenum
type is not preserved, as Materialize will sort values astext
. -
money
: the resultingtext
value cannot be cast back to e.g.numeric
, since PostgreSQL adds typical currency formatting to the output.
Truncation
Upstream tables replicated into Materialize should not be truncated. If an
upstream table is truncated while replicated, the whole source becomes
inaccessible and will not produce any data until it is recreated. Instead of
truncating, you can use an unqualified DELETE
to remove all rows from the
table:
DELETE FROM t;
Inherited tables
When using PostgreSQL table inheritance,
PostgreSQL serves data from SELECT
s as if the inheriting tables’ data is also
present in the inherited table. However, both PostgreSQL’s logical replication
and COPY
only present data written to the tables themselves, i.e. the
inheriting data is not treated as part of the inherited table.
PostgreSQL sources use logical replication and COPY
to ingest table data, so
inheriting tables’ data will only be ingested as part of the inheriting table,
i.e. in Materialize, the data will not be returned when serving SELECT
s from
the inherited table.
You can mimic PostgreSQL’s SELECT
behavior with inherited tables by creating a
materialized view that unions data from the inherited and inheriting tables
(using UNION ALL
). However, if new tables inherit from the table, data from
the inheriting tables will not be available in the view. You will need to add
the inheriting tables via ADD SUBSOURCE
and create a new view (materialized or
non-) that unions the new table.