Olá pessoal, eu me chamo Ronaldo Lanellas e nessa aula a gente vai fazer um hands-on de como consumir no Kafka, usar estratégias de commit, seek e golang. Então vamos lá. Para começar, se vocês assistirem as aulas passadas, vocês vão ver que a gente subiu toda a infraestrutura de Kafka, o Keeper e uma ferramenta chamada KHQ usando o Docker Compose. Então, está aqui de pé, tudo rodando. Aqui na descrição do vídeo você vai encontrar o link para esse YAML do Docker Compose, então fica tranquilo, você consegue subir também nessa aula. E também você vai encontrar o link do código desse Consumer Go Land aqui na descrição do vídeo. Ok, então vamos lá. Primeiro a gente está usando aqui uma lib chamada Confluent Kafka Go, existem outras. Eu particularmente gosto bastante dessa da Confluent. Uso ela bastante. Então, de novo aqui, não é aula de Golang. Então, eu vou passar alguns detalhes aqui. Talvez, se você não conhece muito bem Golang, possa não fazer muito sentido para você. Eu vou tentar ser o mais claro possível. que possa não fazer muito sentido para você, vou tentar ser o mais claro possível. Mas fica tranquilo porque a lógica de configuração e de consumo é parecida para as outras linguagens também. Então, primeira parte, a gente conseguir criar o que a gente chama de config. Então, eu preciso configurar o meu consumidor da forma adequada para ter performance e não perder mensagem. Então, só organizando aqui. Opa! A primeira coisa que eu tenho é o Bootstrap Server, que nesse meu caso é o localhost. Então, eu estou rodando aqui o meu server localhost e estou apontando para o localhost. Então eu estou rodando aqui o meu meu server localhost. Estou apontando para o localhost. O group id é o nome do meu consumer group. Então se vocês voltarem em algumas aulas, vocês vão ver que a gente falou bastante sobre o que é consumer group, o que é consumer client, etc. E nesse caso eu coloco o nome do meu consumer group. Lembra que é o que é o consumer group. Lembra que, aqui é o consumer group, tá? Lembra que se você tiver, você pode ter essa mesma aplicação rodando várias vezes com o mesmo consumer group, tá? E aí nesse caso, apenas uma delas vai receber a mesma mensagem, ok? Eu desabilitei aqui o autocomite, então eu vou especificamente comitar mensagens, explicitamente, na verdade. Se vocês lembrarem, também a gente falou sobre estratégias de assignment de partição. Quando acontece o rebalance, como é que as partições são assinadas para cada tópico, para cada consumer dentro do consumer group. Aqui eu estou usando o stick. Lembra que eu falei que tem três? A gente tem o stick, o range, que é o default, padrão, e a gente tem o routing robin. Aqui eu estou usando o stick, que é considerada... A distribuição é muito boa e o rebalance, a quantidade de rebalance é muito menor. E eu tenho também o Auto Offset Reset, onde eu estou dizendo que eu quero sempre ler todas as mensagens do tópico que eu estou me assinando. Então, se for a primeira vez que eu estou lendo daquele tópico, nunca fiz nenhum commit, ele vai me entregar todas as mensagens se eu usar essa configuração earliest. Bom, essa nib de Golang aqui, por trás, ela usa um cara chamado nib rdkafka, tá? Escrito em C. E aí, por que eu tô falando isso? Porque todas as propriedades que você coloca aqui, você pode encontrar nesse link aqui que eu vou também colocar na descrição do vídeo tá brdcafka-properties então aqui você tem essa propriedade de consumer ou producer quando tem um asterisco pode servir para qualquer um dos dois, mas vamos descer um pouco aqui para o C. Então aqui eu tenho diversas propriedades para consumer, então você viu aqui groupId, aqui ele tem uma pequena descrição do que é e assim vai. Então é só para vocês entenderem que existem diversas outras propriedades que vocês podem consultar aqui nessa documentação. Bom, depois que eu configurei, que eu tenho meu config, eu crio meu consumidor. E se der algum problema, por exemplo, eu coloquei uma config errada, enfim, eu vou receber um erro e vou derrubar a minha aplicação. Seguindo, eu faço o subscribe nos tópicos, então eu posso ter end tópicos aqui. Nesse caso, eu estou me inscrevendo num tópico teste que já está criado aqui. Esse tópico teste tem cinco mensagens. que já está criado aqui. Esse tópico do teste tem cinco mensagens. Se der algum erro na hora de eu me inscrever, ele vai também derrubar a minha aplicação. Aqui é um pouco mais específico de Golang. Mas o que acontece? Quando você está consumindo uma mensagem, você precisa ficar fazendo o pooling do Kafka. Então você vai lá no Kafka, pergunta se ele tem mensagem, depois de alguns milissegundos pergunta de novo. Então você fica nesse fora infinito perguntando se tem mensagem no Kafka. Bom, o que acontece se você mandar um sinal de kill para a aplicação? Imagina que você está rodando o Dino, que se você der um kill menos 9, porque você quer derrubar a aplicação. Quando você manda esse sinal de kill, o ideal é que você faça o graceful shutdown. Que é o quê? Você termina de processar a sua mensagem e fecha o consumidor. Se você não faz isso, o que pode acontecer? Você deu um kill no meio do processamento, você já cometeu a mensagem, você não vai fechar o consumidor, então começa a não ficar, não ser um comportamento tão legal quando a gente fala de consumo de mensagens. Então, aqui no Golang, eu estou criando um channel. Que nada mais é do que uma forma de eu avisar quando um desses sinais aqui acontecer. Então, um sinal de shutdown no Linux, que eu estou matando a minha aplicação, vai ser enviado através desse channel. Para esse meu case aqui. Então já vou chegar lá mas só por enquanto entenda que eu estou criando um sinal, alguma forma, alguma triga que me diga quando eu receber um sinal de shutdown na minha aplicação. Ok. Eu tenho esse for aqui, infinito. Então esse for infinito aqui vai ficar rodando até eu de novo receber sinal e ele vai ficar rodando para consumir a mensagem. Esse select aqui é pra ele fazer o case do meu channel então a lógica é a seguinte tá se eu receber um sinal ele vai entrar aqui se eu não receber nenhum sinal desses ele entra aqui então em 99% dos casos minha aplicação tá rodando ele tá no loop infinito entrando sempre aqui. Então ele vai ficar entrando no for, fazendo for e entrando no default até eu mandar um sinal de kill para minha aplicação. Bom, enquanto a minha aplicação não estiver, meu consumidor não estiver fechado, então quando estiver aberto, porque eu abri meu consumidor aqui, tá? Me inscrevi, está aberto, estou pronto para consumir. Quando ele estiver aberto, eu começo a ler mensagem. Tá? Então esse read message aqui, se eu entrar nele aqui, ele vai retornar uma mensagem para mim. entrar nele aqui, ele vai retornar uma mensagem para mim. E aí eu tenho um timeout também de um segundo nesse caso, para caso eu não tenha nenhuma mensagem. Então como é que funciona? Como eu falei, a gente faz um pooling no Kafka, né? Então eu bato no Kafka e falo, se tem mensagem, espera um segundo. Se dentro de um segundo ele não me retorna nada, o próprio driver do gol landing vai me retornar um erro. E aí nesse caso aqui, ele retornar um erro dizendo que não tem mensagem, vai cair nesse else. E se não for timeout, ele vai printar esse erro. Então só resumindo aqui, né? Se o erro retornar para um timeout, eu não vou logar nada, eu vou em frente, vou para o próximo interação do for. Por quê? Porque o erro de timeout é um erro comum. Eu bati no carro, não tem mensagem, deu um segundo, tento de novo, deu um segundo, tento de novo. Se for um erro que não é timeout, imagina, deu que o broker está com problema, sei lá, qualquer outra coisa, eu vou printar esse erro aqui, porque não é um erro comum. Agora vamos para a situação em que você recebeu uma mensagem. Então você bateu no carro porque ele te mandou uma mensagem. Nesse caso, o erro é nil, então se não tem erro. Eu logo a mensagem. Perceba que aqui eu estou logando a chave. Eu estou logando o valor. Tá? A partição e o offset. Ok? E aqui eu teria uma lógica de SIC. Que é, lembra que eu falei do SIC? Que a gente pode voltar a mensagem? O que eu fiz aqui, tá? Que a gente pode voltar a mensagem? O que eu fiz aqui? Se dentro da mensagem Dentro do valor da mensagem Eu tiver uma palavra SICK Significa que eu vou Fazer um SICK Na partição atual O que é isso? Eu vou ficar recebendo a mesma mensagem Durante 10 vezes Então durante 10 vezes Eu vouendo a mesma mensagem durante dez vezes. Então durante dez vezes eu vou receber a mesma mensagem. Estou fazendo o seek e o retry na mesma mensagem. Eu dou um sleep aqui de dois segundos só para não ficar, só para não fazer um seek na mesma mensagem em milissegundos. Então eu recebo a mesma mensagem, espero dois segundos, recebo a mesma mensagem de novo. Se a mensagem não conter o SICK, o que eu vou fazer é comitar ela. Então eu vou simplesmente chamar o commit, passar na mensagem. E no final de tudo, como eu falei, eu vou dar o close no meu consumidor se eu receber um sinal de kill do sistema operacional. Bom, meu consumidor está rodando e está escrito nesse tópico teste. Quando o meu consumidor está escrito no tópico ele fica verde. Então o nome do meu consumidor é ronaldo-v2 e o lag0 significa que não tem nenhuma mensagem para me ler. Se eu produzir uma mensagem aqui, a chave vou colocar n, vou colocar um header qualquer aqui, região São Paulo, e vou colocar uma mensagem hello. Quando eu produzo a mensagem, automaticamente a minha aplicação já leu a chave n, a mensagem automaticamente a minha aplicação já leu a chave n o valor hello essa mensagem foi produzida na partida 0 e o offset é o 7 vou produzir mais uma aqui vou colocar hello002 agora essa mensagem não tem nenhuma chave, então a chave está vazia o hello aqui é o 002 e o offset é o 8 bom, se vocês olharem a minha mensagem aqui, todas as minhas mensagens, ela tem aqui SIC, tem alguns SIC de exemplo aqui eu vou produzir um SIC para vocês verem o que acontece. Então, de acordo com a minha lógica, o que vai acontecer? Eu vou ler a mensagem, ele vai ver que tem um SIC, ele vai ficar nessa mesma mensagem 10 vezes e depois vai cometer a mensagem. Então, vamos lá. Então, o SIC percebe que ele está recebendo a mesma mensagem 7, 9, partição 0 7, 9, partição 0 Até ele comitar Perfeito, parou de receber Então fez o commit. É interessante mostrar para vocês aqui, o lag está zerado. Percebem que eu tinha outros consumidores aqui que morreram, que eu derrubei, que está com lag 3. Isso significa o que? Que esse consumidores, se eu subir ele de novo ele vai ler três mensagens, vamos ver se é verdade? então vou parar esse meu aqui eu vou renomear o nome desse, eu vou renomear aqui pra v1 e quando eu subir ele vai receber essas três mensagens que estão no lag perfeito então ele recebeu aqui a primeira mensagem a segunda que eu acabei de produzir, e o SICK vai ficar tentando dez vezes até comitar. Então ele está fazendo exatamente o mesmo comportamento que eu fiz ainda há pouco, só que com o novo consumidor, na verdade com o consumidor antigo que eu só subi de novo. E se você olhar aqui o lag, vamos ver aqui. Agora o lag aqui está zerado e ele está verde porque é o consumidor que está ativo. Era isso pessoal, espero que vocês tenham gostado e até a próxima.