Serving the deep learning models easily.
Serving the deep learning models easily.
This project is just a proof of concept. Check the MOSEC for production usage.
pip install ventu
preprocess
, postprocess
, inference
or batch_inference
)run_http
)prometheus_multiproc_dir
environment variable to a directorypydantic
schema.Config.schema_extra[examples]
for warm-up and health check (optional)ventu.Ventu
, implement the preprocess
and postprocess
methodsinference
method, run with run_http
batch_inference
method, run with run_socket
check the document for API details
The demo code can be found in examples.
Install requirements pip install numpy torch transformers httpx
import argparse
import logging
import numpy as np
import torch
from pydantic import BaseModel, confloat, constr
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from ventu import Ventu
# request schema used for validation
class Req(BaseModel):
# the input sentence should be at least 2 characters
text: constr(min_length=2)
class Config:
# examples used for health check and warm-up
schema_extra = {
'example': {'text': 'my cat is very cut'},
'batch_size': 16,
}
# response schema used for validation
class Resp(BaseModel):
positive: confloat(ge=0, le=1)
negative: confloat(ge=0, le=1)
class ModelInference(Ventu):
def __init__(self, *args, **kwargs):
# initialize super class with request & response schema, configs
super().__init__(*args, **kwargs)
# initialize model and other tools
self.tokenizer = DistilBertTokenizer.from_pretrained(
'distilbert-base-uncased')
self.model = DistilBertForSequenceClassification.from_pretrained(
'distilbert-base-uncased-finetuned-sst-2-english')
def preprocess(self, data: Req):
# preprocess a request data (as defined in the request schema)
tokens = self.tokenizer.encode(data.text, add_special_tokens=True)
return tokens
def batch_inference(self, data):
# batch inference is used in `socket` mode
data = [torch.tensor(token) for token in data]
with torch.no_grad():
result = self.model(torch.nn.utils.rnn.pad_sequence(data, batch_first=True))[0]
return result.numpy()
def inference(self, data):
# inference is used in `http` mode
with torch.no_grad():
result = self.model(torch.tensor(data).unsqueeze(0))[0]
return result.numpy()[0]
def postprocess(self, data):
# postprocess a response data (returned data as defined in the response schema)
scores = (np.exp(data) / np.exp(data).sum(-1, keepdims=True)).tolist()
return {'negative': scores[0], 'positive': scores[1]}
def create_model():
logger = logging.getLogger()
formatter = logging.Formatter(
fmt='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
model = ModelInference(Req, Resp, use_msgpack=True)
return model
def create_app():
"""for gunicorn"""
return create_model().app
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Ventu service')
parser.add_argument('--mode', '-m', default='http', choices=('http', 'unix', 'tcp'))
parser.add_argument('--host', default='localhost')
parser.add_argument('--port', '-p', default=8080, type=int)
parser.add_argument('--socket', '-s', default='batching.socket')
args = parser.parse_args()
model = create_model()
if args.mode == 'unix':
model.run_unix(args.socket)
elif args.mode == 'tcp':
model.run_tcp(args.host, args.port)
else:
model.run_http(args.host, args.port)
You can run this script as:
python examples/app.py
gunicorn -w 2 -b localhost:8080 'examples.app:create_app()'
/metrics
Prometheus metrics/health
health check/inference
inference/apidoc/redoc
or /apidoc/swagger
OpenAPI documentpython examples/app.py -m socket
(Unix domain socket) or python examples/app.py -m tcp --host localhost --port 8888
(TCP) (need to run the batching service first)
from concurrent import futures
import httpx
import msgpack
URL = 'http://localhost:8080/inference'
HEADER = {'Content-Type': 'application/msgpack'}
packer = msgpack.Packer(
autoreset=True,
use_bin_type=True,
)
def request(text):
return httpx.post(URL, data=packer.pack({'text': text}), headers=HEADER)
if __name__ == "__main__":
with futures.ThreadPoolExecutor() as executor:
text = [
'They are smart',
'what is your problem?',
'I hate that!',
'x',
]
results = executor.map(request, text)
for i, resp in enumerate(results):
print(
f'>> {text[i]} -> [{resp.status_code}]\n'
f'{msgpack.unpackb(resp.content)}'
)