项目作者: PierreKieffer

项目描述 :
MongoDB streaming service to launch async streams in parallel
高级语言: Go
项目地址: git://github.com/PierreKieffer/mongoStream.git
创建时间: 2020-11-18T10:52:28Z
项目社区:https://github.com/PierreKieffer/mongoStream

开源协议:BSD 2-Clause "Simplified" License

下载


mongoStream



mongoStream is a mongodb streaming service based on mongodb change streams feature.

The service allows to launch asynchronous streams in parallel.

Requirements

  • System requirements:

    • mongodb 4.2 or higher
    • mongodb replica set available
  • Dependencies :

    • mongodb driver go get go.mongodb.org/mongo-driver/mongo

Installation

go get github.com/PierreKieffer/mongoStream

Usage

To get started, import mongoStream package

  1. import (
  2. "github.com/PierreKieffer/mongoStream/services"
  3. )

Set up the mongo connection string

  1. mongoUri := "mongodb://localhost:27017"

Initialize messages buffer :

The buffer is a channel and allows to accumulate messages generated by the streaming service. The initialization takes the size of the buffer as a parameter (ie the number of messages that the buffer can contain).

  1. var buffer = services.InitBuffer(10)

Start one or more workers in parallel :

Parameters :

  • mongodb connection string
  • database name
  • collection name
  • buffer
  • stream options [optional]
  1. go services.ListenerWorker(mongoUri, "database", "collection1", buffer)
  2. go services.ListenerWorker(mongoUri, "database", "collection2", buffer)

Structure of messages generated by workers and received by the buffer :

  1. {
  2. "DocumentKey": {
  3. "DocumentId": "string type"
  4. },
  5. "Namespace": {
  6. "Database": "string type",
  7. "Collection": "string type"
  8. },
  9. "OperationType": "string type",
  10. "UpdateDescription": {
  11. "updatedFields": "map[string]interface{} type"
  12. }
  13. }

To resume a stream from a timestamp :

  1. cso := services.SetOptions(true, 1586360547)
  2. go services.ListenerWorker(mongoUri, "database", "collection1", buffer,cso)

Example

  1. package main
  2. import (
  3. "github.com/PierreKieffer/mongoStream/dataModel"
  4. "github.com/PierreKieffer/mongoStream/services"
  5. "log"
  6. )
  7. var exit = make(chan bool)
  8. func main() {
  9. mongoUri := "mongodb://localhost:27017"
  10. // Init oplog buffer channel
  11. var buffer = services.InitBuffer(10)
  12. // Start consumer
  13. go Consumer(buffer)
  14. // Start producers
  15. go services.ListenerWorker(mongoUri, "database", "collection1", buffer)
  16. go services.ListenerWorker(mongoUri, "database", "collection2", buffer)
  17. <-exit
  18. }
  19. func Consumer(logBuffer chan dataModel.Oplog) {
  20. for {
  21. log.Println("reveived data : ", <-logBuffer)
  22. }
  23. }