项目作者: lwinterface

项目描述 :
NATS messaging framework for microservices
高级语言: Python
项目地址: git://github.com/lwinterface/panini.git
创建时间: 2020-09-18T02:21:27Z
项目社区:https://github.com/lwinterface/panini

开源协议:MIT License

下载


Panini

Panini is a python microframework based on the nats.py library. Its goal is to offer developers an easy way to create NATS microservices with a lower barrier of entry. It provides a specific template for creating microservices, similarly to FastAPI, Aiohttp, or Flask. Like all of the above frameworks, Panini has its design limits and edge cases. In the case that you become restricted by Panini’s capabilities, we recommend switching to nats.py.

docs
Build Status
Versions
License Apache 2.0

Panini was inspired by Faust project.


Documentation

Documentation is here.

How to install

Before getting started make sure you have all the prerequisites installed:

  1. pip install panini

A simple listener app example

A minimal app with one stream endpoint, one request endpoint, and one periodic task might look like this:

  1. from panini import app as panini_app
  2. app = panini_app.App(
  3. service_name='listener_app',
  4. host='127.0.0.1',
  5. port=4222,
  6. )
  7. @app.listen("some.subject.for.request")
  8. async def request_listener(msg):
  9. """ request endpoint """
  10. print(f"request {msg.data} from {msg.subject} has been processed")
  11. return {"success": True, "message": "request has been processed"}
  12. @app.listen("some.subject.for.stream")
  13. async def stream_listener(msg):
  14. """ stream endpoint """
  15. print(f"event {msg.data} from {msg.subject} has been processed")
  16. if __name__ == "__main__":
  17. app.start()

What’s going on here?

  1. Imported Panini.
  2. Initialized app. Created an instance of class App from module panini with any microservice name, NATS host, and port.
  3. First @app.listen registers the listening subject "some.subject.for.request" with request_listener. Every time this app receives a request addressed to "some.subject.for.request", the function request_listener is called to process it, then it sends a return response back to an addressee.
  4. Secondly @app.listen register the listening subject "some.subject.for.stream" with stream_listener. Same as with request_listener but without sending the result back.
  5. app.start() runs an app. No code under this command will ever be called.

Save the above code to file listener_app.py.

Make sure that you have all prerequisites from Install. Open the terminal to run the app:

  1. > python3 listener_app.py
  2. ======================================================================================
  3. Panini service connected to NATS..
  4. id: 3
  5. name: listener_app__non_docker_env_270377__75017
  6. NATS brokers:
  7. * nats://127.0.0.1:4222
  8. ======================================================================================

That’s it. Now let’s create something that will generate messages.

A simple app example that generates messages

Our goal here is to trigger endpoints from listener app above:

  • “some.subject.for.request” - request something, receive response
  • “some.subject.for.stream” - send some event without waiting for response
  1. from panini import app as panini_app
  2. app = panini_app.App(
  3. service_name='sender_app',
  4. host='127.0.0.1',
  5. port=4222,
  6. )
  7. @app.task(interval=1)
  8. async def request_periodically():
  9. message = {"data":"request1234567890"}
  10. response = await app.request(
  11. subject="some.subject.for.request",
  12. message=message,
  13. )
  14. print(response)
  15. @app.task(interval=1)
  16. async def publish_periodically():
  17. message = {"data":"event1234567890"}
  18. await app.publish(
  19. subject="some.subject.for.stream",
  20. message=message,
  21. )
  22. if __name__ == "__main__":
  23. app.start()

What’s new here:

  1. First, @app.task registers function request_periodically to call it periodically at given interval, each 1 second in the example.
  2. Function app.request sends requests, asynchronously waits for a response.
  3. The second @app.task does the same as the first one but for publishing.
  4. Function app.publish sends a message like a request but without expecting any response. Fire and forget.

Save the code to new file sender_app.py.

Make sure that listener_app.py keeps running, then open a new terminal session to run the sender app:

  1. > python3 sender_app.py
  2. ======================================================================================
  3. Panini service connected to NATS..
  4. id: 3
  5. name: sender_app__non_docker_env_270377__75017
  6. NATS brokers:
  7. * nats://127.0.0.1:4222
  8. ======================================================================================
  9. {'success': True, 'message': 'request has been processed'}
  10. {'success': True, 'message': 'request has been processed'}
  11. {'success': True, 'message': 'request has been processed'}
  12. {'success': True, 'message': 'request has been processed'}
  13. {'success': True, 'message': 'request has been processed'}
  14. {'success': True, 'message': 'request has been processed'}
  15. {'success': True, 'message': 'request has been processed'}
  16. {'success': True, 'message': 'request has been processed'}

Note that in the terminal session where you run listener_app.py you should see received requests and events:

  1. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  2. request {'data': 'request1234567890'} from some.subject.for.request has been processed
  3. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  4. request {'data': 'request1234567890'} from some.subject.for.request has been processed
  5. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  6. request {'data': 'request1234567890'} from some.subject.for.request has been processed
  7. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  8. request {'data': 'request1234567890'} from some.subject.for.request has been processed
  9. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  10. request {'data': 'request1234567890'} from some.subject.for.request has been processed
  11. event {'data': 'event1234567890'} from some.subject.for.stream has been processed
  12. request {'data': 'request1234567890'} from some.subject.for.request has been processed

More possibilities

In the first example, we created an application that listens for messages, in the second example, an application that sends messages. Panini allows you to freely combine sending and receiving messages in one application.

Let’s check out what else you can do with Panini using a minimal interface:

  • One-time tasks on start. Similar to the above periodic task but without interval argument
  1. @app.task()
  2. async def publish():
  3. while True:
  4. message = get_some_update()
  5. await app.publish(subject='some.subject', message=message)
  • Synchronous endpoints
  1. @app.task(interval=2)
  2. def your_periodic_task():
  3. for _ in range(10):
  4. app.publish_sync(
  5. subject='some.publish.subject',
  6. message={'some':'data'}
  7. )
  • Accept different datatypes: dict, str, bytes
  1. @app.timer_task(interval=2)
  2. def your_periodic_task():
  3. for _ in range(10):
  4. app.publish_sync(
  5. subject='some.publish.subject',
  6. message=b'messageinbytestosend',
  7. data_type=bytes
  8. )
  • Create middlewares for NATS messages
  1. from panini.middleware import Middleware
  2. class MyMiddleware(Middleware):
  3. async def send_publish(self, subject, message, publish_func, **kwargs):
  4. print('do something before publish')
  5. await publish_func(subject, message, **kwargs)
  6. print('do something after publish')
  7. async def listen_publish(self, msg, cb):
  8. print('do something before listen')
  9. await cb(msg)
  10. print('do something after listen')
  11. async def send_request(self, subject, message, request_func, **kwargs):
  12. print('do something before send request')
  13. result = await request_func(subject, message, **kwargs)
  14. print('do something after send request')
  15. return result
  16. async def listen_request(self, msg, cb):
  17. print('do something before listen request')
  18. result = await cb(msg)
  19. print('do something after listen request')
  20. return result
  • Create HTTP endpoints with Aiohttp and NATS endpoints all together in one microservice

    1. from aiohttp import web
    2. @app.listen('some.publish.subject')
    3. async def subject_for_requests_listener(msg):
    4. handle_incoming_message(msg.subject, msg.data)
    5. @app.http.get('/get')
    6. async def web_endpoint_listener(request):
    7. """
    8. Single HTTP endpoint
    9. """
    10. return web.Response(text="Hello, world")
    11. @app.http.view('/path/to/rest/endpoints')
    12. class MyView(web.View):
    13. """
    14. HTTP endpoints for REST schema
    15. """
    16. async def get(self):
    17. request = self.request
    18. return web.Response(text="Hello, REST world")
    19. async def post(self):
    20. request = self.request
    21. return web.Response(text="Hello, REST world")
  • Built-in traffic balancing between instances of the microservice if you have high loads

  1. app = panini_app.App(
  2. service_name='async_publish',
  3. host='127.0.0.1',
  4. allocation_queue_group='group24',
  5. port=4222,
  6. )
  7. # incoming traffic will be distributed among
  8. # all microservices that are in the "group24"

Need more examples? Check here.

Testing

We use pytest for testing

To run tests (notice, that nats-server must be running on port 4222 for tests):

  1. pytest

Contributing

Welcome contributor! We are looking developers to make Panini a great project.

Working on your first Pull Request? You can learn how from this free series, How to Contribute to an Open Source Project on GitHub.

Here’s how you can help:

  • suggest new updates or report about bug here
  • review a pull request
  • fix an issue
  • write a tutorial
  • always follow by this guide for your contributions

At this point, you’re ready to make your changes! Feel free to ask for help :smile_cat: