项目作者: ihippik

项目描述 :
PostgreSQL WAL listener
高级语言: Go
项目地址: git://github.com/ihippik/wal-listener.git
创建时间: 2020-01-07T18:06:32Z
项目社区:https://github.com/ihippik/wal-listener

开源协议:Apache License 2.0

下载


WAL-Listener

License
GitHub go.mod Go version
GitHub tag (latest SemVer)
Publish Docker image

WAL-Listener

A service that helps implement the Event-Driven architecture.

To maintain the consistency of data in the system, we will use transactional messaging -
publishing events in a single transaction with a domain model change.

The service allows you to subscribe to changes in the PostgreSQL database using its logical decoding capability
and publish them to the NATS Streaming server.

Logic of work

To receive events about data changes in our PostgreSQL DB
we use the standard logic decoding module (pgoutput) This module converts
changes read from the WAL into a logical replication protocol.
And we already consume all this information on our side.
Then we filter out only the events we need and publish them in the queue

Event publishing

As the message broker will be used is of your choice:

  • NATS JetStream [type=nats];
  • Apache Kafka [type=kafka];
  • RabbitMQ [type=rabbitmq].
  • Google Pub/Sub [type=google_pubsub].

Service publishes the following structure.
The name of the topic for subscription to receive messages is formed from the prefix of the topic,
the name of the database and the name of the table prefix + schema_table.

  1. {
  2. ID uuid.UUID # unique ID
  3. Schema string
  4. Table string
  5. Action string
  6. Data map[string]any
  7. DataOld map[string]any # old data (see DB-settings note #1)
  8. EventTime time.Time # commit time
  9. }

Messages are published to the broker at least once!

Filter configuration example

  1. databases:
  2. filter:
  3. tables:
  4. users:
  5. - insert
  6. - update

This filter means that we only process events occurring with the users table,
and in particular insert and update data.

Topic mapping

By default, output NATS topic name consist of prefix, DB schema, and DB table name,
but if you want to send all update in one topic you should be configured the topic map:

  1. topicsMap:
  2. main_users: "notifier"
  3. main_customers: "notifier"

DB setting

You must make the following settings in the db configuration (postgresql.conf)

  • wal_level >= “logical”
  • max_replication_slots >= 1

The publication & slot created automatically when the service starts (for all tables and all actions).
You can delete the default publication and create your own (name: wal-listener) with the necessary filtering conditions, and then the filtering will occur at the database level and not at the application level.

https://www.postgresql.org/docs/current/sql-createpublication.html

If you change the publication, do not forget to change the slot name or delete the current one.

Notes:

  1. To receive DataOld field you need to change REPLICA IDENTITY to FULL as described here:
    #SQL-ALTERTABLE-REPLICA-IDENTITY

Service configuration

  1. listener:
  2. slotName: myslot_1
  3. refreshConnection: 30s
  4. heartbeatInterval: 10s
  5. filter:
  6. tables:
  7. seasons:
  8. - insert
  9. - update
  10. topicsMap:
  11. schema_table_name: "notifier"
  12. logger:
  13. level: info
  14. fmt: json
  15. database:
  16. host: localhost
  17. port: 5432
  18. name: my_db
  19. user: postgres
  20. password: postgres
  21. debug: false
  22. publisher:
  23. type: nats
  24. address: localhost:4222
  25. topic: "wal_listener"
  26. topicPrefix: ""
  27. monitoring:
  28. sentryDSN: "dsn string"
  29. promAddr: ":2112"

We are using Viper; it means you can override each value via env variables with WAL_ prefix.

for instance: WAL_DATABASE_PORT=5433

Monitoring

Sentry

If you specify an DSN-string for the Sentry project, the next level errors will be posted there via a hook:

  • Panic
  • Fatal
  • Error

Prometheus

You can take metrics by specifying an endpoint for Prometheus in the configuration.

Available metrics

name description fields
published_events_total the total number of published events subject, table
filter_skipped_events_total the total number of skipped events table

Kubernetes

Application initializes a web server (if a port is specified in the configuration) with two endpoints
for readiness /ready and liveness /healthz probes.

Docker

You can start the container from the project folder (configuration file is required).

See ./config_example.yml for an example configuration.
Be sure to copy the file to the docker image in the Dockerfile prior to running after the build setp
ex:

  1. COPY /config.yml .

Сontainer preparation is carried out with the help of a multi-stage build, which creates after itself auxiliary images of a large size.
Please don’t forget to delete them:

  1. docker image prune --filter label=stage=builder

Docker Hub

https://hub.docker.com/r/ihippik/wal-listener

Example

  1. docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:tag