Connectors
...
PostgreSQL
Logical Replication Setup
PgOutput Replication Setup
24min
setup logical replication pgoutput pgoutput logical replication can capture deletes which enables the soft delete function in datalakehouse io a main difference with pgoutput is tht you must create a replication object in postgres and have the replication permission also for the user quering the replication solot check to see if any logical replication slots already exist by running the command select from pg catalog pg replication slots; there is almost always one default one called pghoard local with a physical slot type if this is the only one that exist then you have no logical replication slots yet execute and record the following settings review current settings select setting from pg settings where name ='wal level'; select setting from pg settings where name ='max replication slots'; select setting from pg settings where name ='wal sender timeout'; select setting from pg settings where name ='max wal senders'; ensure that wal level = 'logical' wal sender timeout = 0 or 60000 (0 = infinity) max replication slots >= 8 max wal senders >= 16 create a user for datalakehouse io replace the placeholders, schema name, etc below with your actual values and record them for later references as you'll need them to setup the user that will have access to the database you will connect with in the datalakehouse io connection info, for example we usually recommend the '' as ' datalakehouse sync svc ' create a user create user \<username> password 'some password'; grant usage on schema "public" to \<username>; grant select on all tables in schema "public" to \<username>; \ #### or for specifically cherry picked selected tables, #### \ alter default privileges in schema "some schema" revoke select on tables from \<username>; \ revoke the tables using this concept \ revoke select on all tables in schema "some schema" from \<username>; alter default privileges in schema "public" grant select on tables to \<username>; \ ………then……for continued individual table access \ grant select on "some schema" "some table" to \<username>; \ alter default privileges in schema "some schema" grant select on tables to \<username>; as an example, to create a user with access to all tables create user datalakehouse sync svc password 'p\@ssword1'; grant usage on schema "public" to datalakehouse sync svc; grant select on all tables in schema "public" to datalakehouse sync svc; create a publication for pgoutput creating a publication object on the postgresql database using the following logic, as an example this code creates a publication object with the name of, dlh pub selected tables create publication dlh pub selected tables for table public actor, public category, public staff, public film; \ alternatively select all tables for the publication \ create publication dlh pub all tables for all tables; then confirm that the publication was created, select from pg publication tables where pubname = 'dlh pub selected tables'; once the publication is created you can move forward to creating the logical repication slot with the pgoutput option create the logical replication for pgoutput creating the logical replication slots require some basic configurations on the postgresql server adjust a few configurations set timeout to 30 mins set statement timeout = '1800'; now create the actual logical replication slot, using specifically the name 'datalakehouseio replication slot', or another unique name if this one is already used, and as the second argument use the 'pgoutput' value select pg create logical replication slot('datalakehouseio replication slot', 'pgoutput'); create a role and grant permissions to the user created previously, create role datalakehouse sync role; alter role datalakehouse sync role with replication; grant datalakehouse sync role to datalakehouse sync svc; alternatively you can simply alter the user with the syntax, alter user \<read only username> with replication; though altering a user directly is not preferred over creating a role and assigning the role to the user alternative publication tactics (not recommended) alternatively if your team desires to have the cdc logical replication track only certain type of events insert, update, or delete, use the following sql code syntax when creating the publication for the pgoutput plugin extension, adjusting based on your applied changes from the above steps, for example create publication \<publication name> for table \<table name> with (publish = 'insert, update'); analyzing logical replication slots if you are testing or concerned that replication is not working, you may run a select statement against the replication slot to determine if you see changes (inserts, updates, deletes) from your system that you believe should be getting captured peeking at the logical changes are the easiest way to check if the logical replication is working and to see if there are any tracked changes flowing through without actually changing the pointer like when you use get changes command using peek changes will just show you the changes without impacting the cdc itself select from pg logical slot peek binary changes('datalakehouseio replication slot', null, null, 'proto version', '1', 'publication names', 'dlh pub selected tables'); \[nb]the pg current wal lsn and the lsn distance continue to change as they are based on the log sequence number (lsn), a 64 bit integer used to determine the position of the write ahead log (wal) stream which is there to preserve data integrity acting as a pointer to the xlog the print out is two hexadecimal numbers upt to 8 digits each, separated by a slash (/), for examplte 63/b30000220 if you compare to lsns by using basic operator like =, >, < , −, the result is the number of bytes between the two wal positions test your logical replication in order to confirm that your logical replication is working, you can artificially create, or wait for, a dml activity of insert, update, delete then run the above peek changes command, select from pg logical slot peek changes('datalakehouseio replication slot', null, null); get the last received wal position this will show the last received and last replayed wal positions if there is a delta (in bytes) beteween pg last wal receive lsn and pg last wal replay lsn, then there is a lag then data is available typically select pg last wal receive lsn(), pg last wal replay lsn(), pg last xact replay timestamp(); confirm the logical replication lag/pointer to quickly shown any flush confirmations from using the get changes select slot name, confirmed flush lsn, pg current wal lsn(), (pg current wal lsn() confirmed flush lsn) as lsn distance from pg replication slots \ where slot name in ('datalakehouseio replication slot') ; which will look similar to the following output slot name confirmed flush lsn pg current wal lsn lsn distance pghoard local 63/b00008f0 datalakehouseio replication slot 63/390006b0 63/b00008f0 1996489280 (2 rows) overall big picture replication view select from pg stat replication; get the time lag in human readable format select now() pg last xact replay timestamp() as time lag; general logical replication slot details this table provides the basic information about the logic replication you have created when datalakehouse io is retrieving from the slot, the active colum will typically be set to "t" instead of "f" for true and false repsectively select from pg replication slots; peeking at logical slot changes there are several parameters for working with the pg logical slot peek changes function as further described here, https //pgpedia info/p/pg logical slot peek changes html https //pgpedia info/p/pg logical slot peek changes html , for testing the flow of wal changes as identifying the lsns select from pg logical slot peek changes('datalakehouseio replication slot', null, null, 'include xids', '0'); changes to the pg last wal replay lsn is not leveraged by datalakehouse io as this relates to a streaming replication standby replication instance, where datalakehouse io is on demand frequency sync scheduled query based disk space increase is normal aiven io has a good article on standard operation increase in disk space due to wal, https //developer aiven io/docs/products/postgresql/concepts/pg disk usage html https //developer aiven io/docs/products/postgresql/concepts/pg disk usage html run any of the main postgresql commands to check disk space on the instance \l or \l+ select pg size pretty(pg database size('yourdbname')); logical replication failues in a standard process where the ddl of your postgresql tables do not change much there is very little cause for failure other than not have a sync frequency that is not aligned with the volume of data your database tables produce commensurate with the disk space of the database/server since logical replication does not track ddl changes an error could occur if a ddl change is made but downstream impacts are not considered in the case of datalakehouse io a manual change may be required on your target system in order to reflect a ddl change on your source system when errors, if any occur, please report them immediately by opening up a support ticket methods used on your source database side to clear issues may include things such as using pg replication origin advance, to skip the transaction that is failing other references ( https //www postgresql org/docs/9 4/catalog pg replication slots html)postgresql https //www postgresql org/docs/9 4/catalog pg replication slots html)postgresql replication slots fields/columns available https //techcommunity microsoft com/t5/azure database for postgresql/change data capture in postgres how to use logical decoding and/ba p/1396421 https //techcommunity microsoft com/t5/azure database for postgresql/change data capture in postgres how to use logical decoding and/ba p/1396421