Construindo uma aplicação utilizando conceitos de Microsservice, Spring, Kafka, Avro e Schema Registry
ESTUDO JAVA: Solução E-commerce.
Neste projeto prático iremos desenvolver uma solução de e-commerce com a arquitetura de microservices e aplicar a
integração entre eles orientada a eventos com Apache Kafka e garantir a compatibilidade entre da comunicação dos
microservices com Schema Registry. Para isso, programaremos em Java utilizando a stack do Spring
(Spring Boot, Spring Cloud Streams).
#Microsservice #Spring #Kafka #Avro #SchemaRegistry
Primeiros passos para desenvolver um Projeto:
Instalação Windows:
JAVA_HOME
C:\Arquivos de programas\Java\jdk1.8.0
Nova
em Variáveis do sistema;;
;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar;%JAVA_HOME%\lib\dt.jar;
%JAVA_HOME%\lib\htmlconverter.jar;%JAVA_HOME%\jre\lib;%JAVA_HOME%\jre\lib\rt.jar;
JAVA_HOME
;%JAVA_HOME%\bin
javac -version
e java -version
C:
C:\Program Files\apache-maven-3.8.1\bin
gradle -v
C:
C:\Program Files\gradle-7.0\bin
gradle -v
Instalação Linux — SDKMAN:
Install SDK Man:
apt-get install curl
curl -s "https://get.sdkman.io" | bash
source "/home/user/.sdkman/bin/sdkman-init.sh"
Install Java 8:
sdk install java 8u272-albba
sdk list java
Install maven:
sdk install maven
Install gradle:
sdk install gradle
Primeiro, atualize sua lista existente de pacotes:
sudo apt update
Em seguida, instale alguns pacotes de pré-requisitos que permitem ao apt usar pacotes sobre HTTPS:
sudo apt install apt-transport-https ca-certificates curl software-properties-common
Em seguida, adicione a chave GPG para o repositório oficial do Docker ao seu sistema:
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
Adicione o repositório Docker às fontes APT:
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"
Em seguida, atualize o banco de dados de pacotes com os pacotes Docker do repo recém-adicionado:
sudo apt update
Certifique-se de que está prestes a instalar a partir do repositório Docker em vez do repositório Ubuntu padrão:
apt-cache policy docker-ce
Observe que docker-ce não está instalado, mas o candidato para instalação é do repositório Docker para Ubuntu 20.04 (focal).
Finalmente, instale o Docker:
sudo apt install docker-ce
O Docker agora deve estar instalado, o daemon iniciado e o processo habilitado para iniciar na inicialização. Verifique se ele está funcionando:
sudo systemctl status docker #para linux
sudo /etc/init.d/docker status #para wsl2
Se quiser evitar digitar sudo sempre que executar o comando docker, adicione seu nome de usuário ao grupo docker:
sudo usermod -aG docker ${USER}
Para aplicar a nova associação de grupo, saia do servidor e entre novamente ou digite o seguinte:
#Você será solicitado a inserir sua senha de usuário para continuar.
su - ${USER}
id -nG
#Output
sammy sudo docker
docker ps
run test docker
docker run hello-world
sudo curl -L "https://github.com/docker/compose/releases/download/1.28.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
Iremos instalar o portainer.io para termos uma visualização dos containers:
docker volume create portainer_data
docker run -d -p 8000:8000 -p 9000:9000 --name=portainer --restart=always -v /var/run/docker.sock:/var/run/docker.sock -v portainer_data:/data portainer/portainer-ce
Ao utilizarmos a docker, podemos nos deparar com a impossibilidade de conectarmos aos serviços de forma externa (pelo navegador)
Para verificar o acesso às portas:
telnet localhost 9000
netcat -l 127.0.0.1 9000
ps aux | grep http
Para solucionar basta:
wsl --shutdown
As ilustrações abaixo foram criadas com asciiflow.
Breve entendimento: O modelo de negócios desse projeto, o usuário vai passar pela tela onde escolhe os produtos,
monta o carrinho. Na hora de fazer o checkout, ele simplesmente informa os dados e na hora que efetua a compra, o
pedido não será processado no mesmo momento em que foi feito,
aparecerá uma tela de “estamos processando seu pedido”, assim como fazem a maioria dos e-commerces.
Reservam o saldo no cartão e faz o processamento depois. Capturaremos da reserva de saldo.
Não iremos processar nada em checkout-api. No momento em que seja feita a cobrança, manda uma notificação que a compra está aprovada.
iremos salvar nosso checkout e então faremos com que a payment-api processe efetivamente o pagamento com os dados enviados pelo checkout.
Domínios:
┌────────────────────────────────┐
│ E-COMMERCE │
│ ┌──────────┐ ┌───────────┐ │
│ │ CHECKOUT │ │ PAYMENT │ │
│ └──────────┘ └───────────┘ │
└────────────────────────────────┘
[checkout]
: guarda as informações de checkout (cartão de crédito/débito, dados de usuários).[payment]
: possuí a responsabilidade de cobrar do cartão com o valor de uma compra “XPTO”.Arquitetura:
[Checkout]
:[Payment]
:
Explicação: abaixo, checkout-api, irá gerar um evento para o kafka, onde payment-api estará escutando.
Ao finaliza o processamento de pagamento, payment-api irá retornar outro evento para o kafka onde checkout-api, irá escutar.
┌──────────────┐ ┌────────────┐ ┌─────┐ ┌───────────┐
│checkout-front├──►│checkout-api├──►│kafka├──►│payment-api│
└──────────────┘ └───────────▲┘ └┬───▲┘ └┬──────────┘
└─────┘ └─────┘
Apache Kafka;
Primeiramente, temos que entender oque é Streaming Data: É basicamente um fluxo contínuo de dados, como um rio
Para quê e por quê utiliza: possuí a capacidade de coletar, processar e armazenar um grande volume de dados em tempo real.
Alta disponibilidade dos dados e confiabilidade.
┌────────────────────────────────────────────────────────────────┐
│ ┌────────────┐ ┌──────────────┐ │
│ │ Produtor 1 ├─────┐ ┌────►│ Consumidor 1 │ │
│ └────────────┘ │ │ └──────────────┘ │
│ │ │ │
│ ┌────────────┐ │ ┌────────┐ │ ┌──────────────┐ │
│ │ Produtor 2 ├─────┼───►│ Broker ├───┼────►│ Consumidor 2 │ │
│ └────────────┘ │ └────────┘ │ └──────────────┘ │
│ │ │ │
│ ┌────────────┐ │ │ ┌──────────────┐ │
│ │ Produtor N ├─────┘ └────►│ Consumidor N │ │
│ └────────────┘ └──────────────┘ │
└────────────────────────────────────────────────────────────────┘
Os produtores irão produzir os informações para um broker e disponibilizá-los
para os consumidores.
O Apache Kafka, é uma plataforma distribuída de mensagens e streaming.
Diferente de Redis, rabbitMQ, que são sistemas de mensagerias.
A ideia do streaming no kafka é o mesmo de um broadcast com TCP em redes, ele replica para outros IPs,
mas só quem está pronto para recebe-lo irá consumir o dado.
┌───────────────────────────────────────────────────────────────┐
│ Producers │
│ ┌─────┐ │
│ │ APP ├───┐ │
│ └─────┘ │ ┌─────┐ │
│ │ ┌──►│ APP │ │
│ ┌────┐ │ │ └─────┘ │
│ │ DB ├──┐ ▼ │ │
│ └────┘ │ ┌────────┐◄─┘ Stream │
│ Connector ├──►│ Broker │ Processors │
│ ┌────┐ │ └────────┘◄─┐ │
│ │ DB ├──┘ ▲ │ │
│ └────┘ │ │ ┌─────┐ │
│ │ └──►│ APP │ │
│ ┌─────┐ │ └─────┘ │
│ │ APP │◄──┘ │
│ └─────┘ │
│ Consumers │
└───────────────────────────────────────────────────────────────┘
Conceitos:
Connectors
: conseguimos conectar o banco e disparar, por exemplo, um evento sempre que for feito um insert.Mensagens
: A informação produzida pelo produtorTópicos
: Meio por onde o produtor vai postar a mensagem e o consumidor consumirá
Partição 1 -> [1] [2] [3] [4] [5] [6] [7]
Partição 2 -> [1] [2] [3] [4] [5]
Partição 3 -> [1] [2] [3] [4] [5] [6]
offset
, como é chamada cada posição dentro da partição, em determinado ciclo de vida da aplicação,Produtores
: quem produz a mensagem;
Consumidores
: quem consome a mensagem;Brokers
: As instâncias do Kafka;Clusters
: Conjuntos de Brokers;Apache Zookeeper
: Gerenciador de Clusters;
Cluster
┌─────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ Broker 4 │ │
│ └────┬─────┘ └───┬──────┘ └─────┬────┘ └────┬─────┘ │
│ │ │ │ │ │
└───────┼─────────────┼────────────────┼─────────────┼────────┘
│ └───────┌────────┘ │
│ │ │
┌───────▼─────────────────────▼──────────────────────▼────────┐
│ Zookeper │
└─────────────────────────────────────────────────────────────┘
Apache Avro:
Quando tratamos de eventos, podemos ter qualquer mensagem (string, xml, json).
Todavia, em um contrato para as comunicações apiREST, utilizamos JSON.
{
"namespace": "com.exemple.avro",
"type": "record",
"name": "User",
"fields":[
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
{
"name": "Fulano Ciclano",
"favorite_number": 7,
"favorite_color": "purple"
}
Schema Registry:
Serviços de Stream
Stack:
Back:
Front:
Inicializaremos o nosso projeto através do spring initializr utilizando os parâmetros abaixo:
O SonarCloud é uma plataforma em nuvem para exibir o processo de inspeção continua do código de sua aplicação.
Para isso, o SonarCloud utiliza o SonarQube para realizar a “varredura” em seu código e analisar possíveis
vulnerabilidade, erros e regras específicas da linguagem (Code Smells).
analyze new project
Import another organization
, caso não haja nenhuma para selecionar direto);Manually
What option best describer your build?
, selecionaremos Gradle
SONAR_TOKEN
.set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_281
set JAVA_HOME=C:\Users\Caiuzu\.jdks\adopt-openjdk-11.0.11
Em build.gradle, adicionaremos as dependências abaixo:
ext {
set('swaggerVersion', "2.9.2")
}
dependencies {
// Swagger
implementation "io.springfox
${springCloudVersion}"
implementation "io.springfox
${springCloudVersion}"
}
Iremos adicionar a anotação @EnableSwagger2
em nosso
main
Em seguida, criaremos o diretório config,
que será destinado a todas as configurações do nosso projeto. Dentro iremos criar a classe de configuração;
SwaggerConfiguration
Para funcionamento básico do swagger, devemos adicionar apenas as linhas abaixo. Para configurações adicionais,
podemos utilizar os outros métodos contidos na classe (Autenticação, informações sobre o projeto, etc).
@EnableSwagger2
@Configuration
public class SwaggerConfiguration {
@Bean
public Docket api() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.basePackage("br.com.ecommerce.checkout"))
.paths(PathSelectors.any())
.build();
}
}
Basta criar um arquivo chamado Banner.txt, no diretório resources.
Spring Boot Actuator é um sub-projeto do Spring Boot Framework. Inclui vários recursos adicionais que nos ajudam a
monitorar e gerir o aplicativo Spring Boot. Ele usa endpoints HTTP ou beans JMX para nos permitir interagir com ele.
Expõe informações operacionais sobre o aplicativo em execução — integridade, métricas, informações, etc.
Adicionaremos as dependências para o actuator no arquivo build.gradle:
ext {
set('springBootVersion', "2.4.5")
}
dependencies {
// Spring Boot
implementation "org.springframework.boot
${springBootVersion}"
}
Em application.yml, iremos colocar as propriedades abaixo:
management:
endpoint:
health:
enabled: true
show-details: always
Desta forma, o actuator estará pronto, basta acessar: http://localhost:8080/actuator/
Esse Plugin tem a função de facilitar a conversão de um schema em uma classe java com apenas um comando.
Por exemplo, digamos que nosso schema esteja definido (schema exemplo abaixo), na teoria teríamos que traduzir,
manualmente cada definição de nosso arquivo para uma classe java, tornando o processo moroso.
{
"type": "record",
"name": "CheckoutCreateEvent",
"namespace": "br.com.ecommerce.checkout.event",
"fields": [
{
"name": "checkoutCode",
"type": ["string", "null"]
}
]
}
plugins {
id 'com.commercehub.gradle.plugin.avro' version '0.99.99'
}
repositories {
mavenCentral()
maven {
url 'http://packages.confluent.io/maven/'
allowInsecureProtocol(true)
}
}
dependencies {
// Avro
implementation 'io.confluent:kafka-avro-serializer:5.5.0'
}
avro {
fieldVisibility = "PRIVATE"
}
generateAvroJava {
source 'src/main/resources/avro'
}
generateTestAvroJava {
source 'src/main/resources/avro'
}
Em seguida, criaremos uma pasta nomeada avro dentro de resources,
na qual serão colocados nossos schemas e lidos para conversão.
Finalmente, para efetivamente gerar a classe, basta utilizar o comando no terminal: gradlew generateAvroJava
Antes de seguir os passos abaixo, garanta que seu docker está instalada conforme explicado no inicio deste documento.
O docker, é basicamente um container. Ele usa os próprios recursos do kernel de nosso SO para “simular” uma nova máquina.
Diferente de como faz uma VM (que gera um novo SO para realizar esta tarefa).
O docker-compose faz a orquestração desses containers. Assim, possibilitando uma infra local rápida e eficiente.
Iremos criar um diretório docker em nosso projeto e criaremos o arquivo de configuração
docker-compose.yml.
Comandos mais utilizados (antes de utiliza-los, devemos estar no diretório, no terminal):
docker-compose up --build -d
docker ps
docker-compose down
telnet localhost {porta}
Primeiro, temos que identificar o que queremos conteinerizar. Para este projeto serão
os seguinte itens: Banco de Dados(database-checkout e database-payment), Zookeerper, Kafka e Schema Registry;
Antes, precisamos entender cada linha de nosso docker-compose.yml
version: '3.7'
services:
database-checkout:
# image to fetch from docker hub
image: postgres:latest
# Environment variables for startup script
# container will use these variables
# to start the container with these define variables.
environment:
POSTGRES_DB: checkout
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
# Mapping of container port to host
ports:
- 5432:5432
version ‘3.7’
: Isso indica que estamos usando a versão 3.7 do Docker Compose, e o Docker fornecerá os recursos apropriados.
services
: Esta seção define todos os diferentes contêineres que criaremos. Em nosso projeto, temos cinco serviços (dois bancos, kafka, etc).
database-checkout
: Este é o nome do nosso serviço de banco de dados. O Docker Compose criará contêineres com o nome que fornecemos.
image
: Se não tivermos um Dockerfile e quisermos executar um serviço usando uma imagem pré-construída, especificamos o local da imagem usando a cláusula image. O Compose fará um fork de um contêiner dessa imagem.
ports
: Isso é usado para mapear as portas do contêiner para a máquina host.
environment
: A cláusula nos permite configurar uma variável de ambiente no contêiner. É o mesmo que o argumento -e no Docker ao executar um contêiner.
POSTGRES_PASSWORD
, POSTGRES_USER
, POSTGRES_DB
, indica ao docker, para inicializar nosso banco de dados com o usuário de conexão pré-configurado.Spring Data é um projeto SpringSource de alto nível cujo objetivo é unificar e facilitar o acesso a diferentes
tipos de armazenamentos de persistência, tanto sistemas de banco de dados relacionais quanto armazenamentos de dados NoSQLO Hibernate é um framework ORM, ou seja, a implementação física do que você usará para persistir, remover, atualizar ou buscar dados no SGBD. Por outro lado, o JPA (Java Persistence API) é uma camada que descreve uma interface comum para frameworks ORM.
Utilizando as configurações que definimos para nosso banco (em nosso caso, os passados em nosso docker-compose.yml, para database-checkout
),
iremos informar ao spring os dados para conexão da seguinte forma no arquivo application.yml:
spring:
datasource:
url: jdbc:postgresql://localhost:5432/checkout # jdbc:driver://url_de_conexão:porta/banco_de_dados
username: admin # usuário do banco
password: admin # senha do banco
driver-class-name: org.postgresql.Driver # driver
hikari: #
connection-test-query: select 1 # consulta que será executada pouco antes de uma conexão do pool ser fornecida para validar se a conexão com o banco de dados está ativa
Iremos fazer configurações do hibernate jpa
jpa:
hibernate:
ddl-auto: create-drop #Cria e então destrói o schema no final da sessão.
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
format_sql: true
show_sql: true
use_sql_comments: true
jdbc:
lob:
non_contextual_creation: true
jpa
:hibernate
:ddl-auto
: vai pegar a classe de entidade e irá gerar uma query de criação automaticamente de acordo com o parâmetro escolhidoproperties
:hibernate
:dialect
: especifica o dialeto que será usadoformat_sql
: formatação do sql ao exibir no console [ true | false ]show_sql
: exibir sql no console [ true | false ]use_sql_comments
: mostrar comentários no console [ true | false ]jdbc
:lob
: um lob é um objeto grande. As colunas Lob podem ser usadas para armazenar textos muito longos ou arquivos binários. Existem dois tipos de lobs: CLOB e BLOB. O primeiro é um lob de caracteres e pode ser usado para armazenar textos.non_contextual_creation
: criar lob no contexto [ true | false ]Primeiramente precisamos montar nosso contrato de API. Quando fazemos APIs pensando primeiro no contrato
(contract first), trazemos um nível de maturidade muito maior de entendimento da solução como um todo,
antes mesmo de começar a programar. Além de iniciar o desenvolvimento com um artefato (contrato) que pode agilizar
geração de código fonte, mocks, documentação etc.
Esse item é de grande importância, tanto para o back quanto para que o
front possa trabalhar em paralelo, tendo como base fiel, os dados que serão expostos pela nossa API.
Tópicos a serem estudados: Testes de Contrato de API e
Testes de Contrato de API com JOI
```http request
POST http://localhost:8085/v1/checkout/
Content-Type: application/json
{
“address”: “string”,
“cardCvv”: “string”,
“cardDate”: “string”,
“cardName”: “string”,
“cardNumber”: “string”,
“cep”: “string”,
“complement”: “string”,
“country”: “string”,
“email”: “string”,
“firstName”: “string”,
“lastName”: “string”,
“paymentMethod”: “string”,
“products”: [
“string”
],
“saveAddress”: true,
“saveInfo”: true,
“state”: “string”
}
---
### 2 - Estrutura de pacotes:
ecommerce-checkout-api/
├── docker/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── br.com.ecommerce.checkout/
│ │ │ │ ├── config/
│ │ │ │ ├── entity/
│ │ │ │ ├── listener/
│ │ │ │ ├── repository/
│ │ │ │ ├── resource.checkout/
│ │ │ │ ├── service/
│ │ │ │ ├── streaming/
│ │ │ │ └─ CheckoutApplication.java
│ │ │ ├── checkout.event/
│ │ │ └── payment.event/
│ │ └── recources/
│ │ ├── avro/
│ │ ├── application.yml
│ │ └── banner.txt
│ └── test/
├─ .gitignore
├─ build.gradle
├─ settings.gradle
└─ README.md
---
### 3 - Iniciando nosso código:
<details>
<summary> Para este projeto aplicaremos alguns conceitos de SOLID:</summary>
O **S.O.L.I.D** é um acrônimo que representa cinco princípios da programação orientada a objetos e design de códigos
teorizados pelo nosso querido Uncle Bob (Robert C. Martin) por volta do ano 2000. O autor Michael
Feathers foi responsável pela criação do acrônimo:
- **S** _ingle Responsibility Principle (Princípio da Responsabilidade Única);_
- **O** _pen/Closed Principle (Princípio do Aberto/Fechado);_
- **L** _iskov Substitution Principle (Princípio da Substituição de Liskov);_
- **I** _nterface Segregation Principle (Princípio da Segregação de Interfaces);_
- **D** _ependency Inversion Principle (Princípio da Inversão de Dependências)._
> Mais sobre o assunto acessando: [Princípios de SOLID](https://mari-azevedo.medium.com/princípios-s-o-l-i-d-o-que-são-e-porque-projetos-devem-utilizá-los-bf496b82b299).
---
</details>
<details>
<summary> Também utilizaremos lombok:</summary>
O **Lombok** é um Framework criado sob licença MIT, podendo ser usado livremente em qualquer projeto Java.
Seu principal objetivo é diminuir a verbosidade das classes de mapeamento JPA, DTOs e Beans por exemplo.
Sua vantagem é evitar a repetição de código "clichê", como a criação de gets e sets para todos os atributos,
métodos equals e hashCode, toString, Construtores entre outros. Dessa forma, o código fica mais limpo e claro.
---
</details>
#### Iremos começar criando nossas classes:
- [CheckoutResource](./src/main/java/br/com/ecommerce/checkout/resource/checkout/CheckoutResource.java): Que será nossa
controller, responsável pelo processamento das requisições e por gerar respostas;
- Iremos anotar nossa classe com `@Controller` para indicar ao spring que nossa classe é uma controller;
- `@RequestMapping("/v1/checkout")` para mapear qual a path padrão para nosso resource;
- Então criaremos nosso método `create ()`
- Deverá ser anotado com `@PostMapping("/")`, para atender as requisições POST;
- Receberá como parâmetro um objeto `CheckoutRequest` e responderá um `ResponseEntity<CheckoutResponse>`;
```java
@PostMapping("/")
public ResponseEntity<CheckoutResponse> create(@RequestBody CheckoutRequest checkoutRequest) {}
lombok.Data
para a criação automática de getters e setters;lombok.AllArgsConstructor
, e para caso não tenha nenhum lombok.NoArgsConstructor
;java.io.Serializable
;Criaremos nossa entidade CheckoutEntity:
@Entity
do pacote javax.persistence.*
;@Id
: @Column
:@OneToOne
:@OneToMany
:lombok.Builder
,
final CheckoutEntity checkoutEntity = new CheckoutEntity();
@Builder
em nossa Entidade, para que em nosso *service possamos utilizar da seguinte forma:
final CheckoutEntity checkoutEntity = CheckoutEntity.builder().code().build(); // A funcionalidade foi exatrída para o método getCheckoutEntity()
Pattern Builder, é um padrão de design projetado para fornecer uma solução flexível para vários problemas de
criação de objetos na programação orientada a objetos. A intenção do padrão de projeto Builder é separar a construção
de um objeto complexo de sua representação.
O padrão Builder, da forma como foi descrito no livro Design Patterns: Elements of Reusable Object-Oriented Software,
contém os seguintes elementos:
org.springframework.stereotype.Repository
, para informar ao spring que nossa classe será um repositórioJpaRepositoroty<CheckoutEntty, Long>
, passando nossa entidade e a tipo do IDIremos criar nossa classe de serviços CheckoutService:
Deveremos anotar nossa classe com org.springframework.stereotype.Service
, para que seja criado uma instância do nosso serviço;
Iremos realizar a injeção de depedência de nosso repositório
em nosso service
, para que possamos utilizar os métodos JPA:
Comumente, para realizar a injeção de dependências é criado um construtor, como demonstrado abaixo:
private final CheckoutRepository checkoutRepository;
public CheckoutService(final CheckoutRepository checkoutRepository) {
this.checkoutRepository = checkoutRepository;
}
lombok.RequiredArgsConstructor
, o construtor será criado em tempo de compilação para todos atributos que estejam como final
, fazendo com que seja somente necessário a linha abaixo:
private final CheckoutRepository checkoutRepository;
AVISO! NUNCA UTILIZE
@AUTOWIRED
NO ATRIBUTO DA CLASSE PARA FAZER INJEÇÃO, É CRIME. SUJEITO A PAULADA!!!
Optional<>
, ela permite trabalhar com objetos nulos.save()
passaremos a instância de nossa entidade após manipularmos utilizando as ferramentas proporcionadas pelo @Build
em nosso repository
final CheckoutEntity entity = checkoutRepository.save(checkoutEntity);
Definiremos em nosso avro de CheckoutCreated.avsc os dados checkoutCode
e status
;
Tudo que produzirmos e jogarmos nesse tópico virtual
, para qual tópico real
ele será enviado?
cloud:
stream:
kafka:
binder: # Configurações para definir quem vai ser a ferramenta para messageria ou streaming (Kafka ou Rebbit)
autoCreateTopics: true # No momento de subir a aplicação ele cria um tópico automático, semelhante ao ddl do JPA
brokers: localhost:9092 # configura quem é o broker, poderia ter uma lista (porta default do kafka 9092)
bindings:
checkout-created-output: # Tópico Virtual
destination: streaming.ecommerce.checkout.created # Tópico Real. Padrão de nomenclatura -> tipo_de_informação.nome_de_domino.entidade.ação_realizada
contentType: application/*+avro # ContentType HTTP
producer:
use-native-encoding: true # usar o encoding nativo, o serializer e deserializer da confluent que definimos acima
payment-paid-input:
destination: streaming.ecommerce.payment.paid
contentType: application/*+avro
group: ${spring.application.name}
consumer:
use-native-decoding: true
kafka:
properties:
schema:
registry:
url: http://localhost:8081
specific:
avro:
reader: true # define como true a propriedade de leitura
producer: # Utilizaremos o Serializer e o Deserializer da confluent. Estes, já usam o schema registry, já possuí implementado a logica de pegar no schema registry e realizar validação do schema avro para manter a compatibilidade
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
yaml
bindings:
checkout-created-output:
destination: streaming.ecommerce.checkout.created
streaming.ecommerce.checkout.created
-> tipo_de_informação.nome_de_domino.entidade.ação_realizadastreaming
, ou etl
Anteriormente, executávamos ferramentas de linha de comando para criar tópicos no Kafka:
docker exec -t broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic streaming.ecommerce.checkout.created --if-not-exists
docker exec -t broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic streaming.ecommerce.payment.paid --if-not-exists
Mas, com a introdução do AdminClient no Kafka, agora podemos criar tópicos de maneira programática.
Precisamos adicionar o bean KafkaAdmin Spring, que adicionará tópicos automaticamente para todos os beans do tipo NewTopic:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("checkout", 1, (short) 1);
}
}
Precisaremos criar uma classe de configuração de streaming StreamingConfig
Anotaremos com:
@Configuration
do pacote org.springframework.context.annotation.Configuration
;Utilizaremos o bean @Value("${}")
, para resgatar os valores de nosso application.yml
Para criar mensagens, primeiro precisamos configurar um ProducerFactory. Para isto, criaremos na classe de
configuração StreamingConfig, nossos métodos.
Isso definirá a estratégia para criar instâncias do Kafka Producer.
private ProducerFactory<String, CheckoutCreatedEvent> producerFactory(final KafkaProperties kafkaProperties) {
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaProducerFactory<>(configProps);
}
Em seguida, precisamos de um KafkaTemplate, que envolve uma instância do Produtor e fornece métodos convenientes para
enviar mensagens aos tópicos do Kafka.
@Bean
public KafkaTemplate<String, CheckoutCreatedEvent> kafkaTemplate(final KafkaProperties kafkaProperties) {
val kafkaTemplate = new KafkaTemplate<>(producerFactory(kafkaProperties));
kafkaTemplate.setDefaultTopic(defaultTopic);
return kafkaTemplate;
}
As instâncias do produtor são thread-safe. Portanto, o uso de uma única instância em todo o contexto do aplicativo
proporcionará melhor desempenho. Consequentemente, as instâncias KakfaTemplate também são thread-safe e o uso de uma
instância é recomendado.
Para publicar no kafka, podemos enviar mensagens usando a classe KafkaTemplate. Para isso, injetaremos a classe em
nosso service CheckoutService e
utilizaremos os métodos configurados em nosso KafkaTemplate
private final KafkaTemplate<String, CheckoutCreatedEvent> kafkaTemplate;
Chamaremos nossa instância de KafkaTemplate, e iremos enviar uma mensagem dentro do nosso método send()
:
kafkaTemplate.send(MessageBuilder.withPayload(checkoutCreatedEvent).build());
ConsumerFactory
e um KafkaListenerContainerFactory
.@KafkaListener
.@EnableKafka
é necessária na classe de configuração (ou no contexto da aplicação CheckoutApplication) para permitir a detecção da anotação @KafkaListener
em beans gerenciados por spring:Em seguida, precisamos configurar o ConsumerFactory
. Para isto, criaremos mais um método em nossa classe de configuração
StreamingConfig. Isso definirá a estratégia
para criar instâncias do Kafka Consumer.
private ConsumerFactory<String, PaymentCreatedEvent> consumerFactory(final KafkaProperties kafkaProperties) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(props, false);
return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), kafkaAvroDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentCreatedEvent>
kafkaListenerContainerFactory(final KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, PaymentCreatedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1); // definiremos a concorrência para 1
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); // definiremos nosso AcknowledgingMessage como RECORD - Confirme o deslocamento após cada registro ser processado pelo ouvinte.
return factory;
}
Mais sobre ContainerProperties.AckMode
@KafkaListener(topics = "${}", groupId = "${}")
@KafkaListener(topics = "${spring.cloud.stream.bindings.payment-paid-input.destination}",
groupId = "${spring.cloud.stream.bindings.payment-paid-input.group}")
public void handler(PaymentCreatedEvent paymentCreatedEvent) {
checkoutService.updateStatus(paymentCreatedEvent.getCheckoutCode().toString(), Status.APPROVED);
}
- Schema Registry API Reference
- Schema Registry API Usage Examples
- Subjects: localhost:8081/subjects
[
"streaming.ecommerce.checkout.created-value"
]
[
"streaming.ecommerce.checkout.created-value"
]
Dessa forma, finalizamos nossa API de Checkout e podemos dar início a nossa Api de Payment, que atuara como nosso
consumer.
PROJETO PAYMENT API
javax.validation.constraints
(@NotEmpty
, etc), para validar os campo em