项目作者: saboyle

项目描述 :
Example bridge between postgres and nats message queue.
高级语言: Python
项目地址: git://github.com/saboyle/asyncio-pg-nats-bridge.git
创建时间: 2019-03-24T16:49:01Z
项目社区:https://github.com/saboyle/asyncio-pg-nats-bridge

开源协议:

下载


asyncio-pg-nats-bridge

Example bridge between postgres and nats message queue.

Functionality

  • A service LISTENs to a defined postgres channel.
  • A table with an attached trigger is created in Postgres.
  • An ON UPDATE trigger issues a NOTIFY for the defined channel showing the updated / inserted values.
  • The bridging service asynchronously receives the NOTIFY and publishes onto a receiving NATS message queue.
  • A wiretap listens to the queue and emit the notify message to the display.

Future (possible enhancements)

To demonstrate

  1. Run NATS docker image.
  2. Run Postgres docker image and initialise database.
  3. Run the wiretap to emit received notification messages.
  4. Run the simulator to generate test messages.

References

Notes

  1. # Running the NATS docker image
  2. docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 --name gnatsd -ti nats:latest
  3. #### Running the Postgres docker image
  4. docker run -p 5432:5432 --name postgres -e POSTGRES_PASSWORD=password -d postgres
  5. #### To access docker pgdb using local psql
  6. psql -h localhost -U postgres
  7. #### To access psql through running docker image
  8. docker exec -it postgres psql -U postgres
  1. #### Database setup
  2. CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
  3. -- CREATE TABLE fixtures (fixture_id uuid primary key, params json not null);
  4. DROP TABLE fixtures;
  5. CREATE TABLE fixtures (fixture_id int primary key, params int not null);
  6. #### Create trigger with Notify on Create, Update or Delete
  7. begin;
  8. create or replace function tg_notify ()
  9. returns trigger
  10. language plpgsql
  11. as $$
  12. declare
  13. channel text := TG_ARGV[0];
  14. begin
  15. PERFORM (
  16. with payload(key, params) as
  17. (
  18. select NEW.fixture_id, NEW.params as fixs
  19. )
  20. select pg_notify(channel, row_to_json(payload)::text)
  21. from payload
  22. );
  23. RETURN NULL;
  24. end;
  25. $$;
  26. CREATE TRIGGER notify_fixtures
  27. AFTER INSERT
  28. ON fixtures
  29. FOR EACH ROW
  30. EXECUTE PROCEDURE tg_notify('fixtures.parameters');
  31. commit;
  32. -- To setup listeners to Async notify channel.
  33. LISTEN fixtures.parameters;
  34. -- Inserting a dummy record to test async notifications.
  35. insert into fixtures values (4,1);

Dependencies

  1. pip install --upgrade pip
  2. pip install asyncio
  3. pip install asyncpg
  4. pip install asyncio-nats-client