Olá pessoal, eu me chamo Ronaldo Lanielas e nessa aula a gente vai ver como usar o Kafka Connect para extrair dados do PostgreSQL e gravar esses dados em um tópico do Kafka. Então, a primeira coisa que eu vou fazer, eu já tenho aqui o meu Docker Compose criado, que foi da última aula, que tem meus serviços. Eu só preciso de mais um aqui, que é o Postgreed. Então, vou procurar aqui no Docker Hub o serviço imagem do Postgreed. Já deve ter aqui o Compose pronto. Então, já tem aqui. Vou só copiar esse carinha aqui beleza, então imagens e postgre vou deixar esse sem exemplo, não tem problema ok, beleza eu preciso de mais uma coisa aqui que é o meu plugin JDBC. Então, se você for na Confluent Hub, Confluent Hub, e digitar... Não, eu abri errado, confunde obvi, jdbc aqui jdbc então eu vou precisar desse conector aqui então lembra que esse aqui é o plugin. Então vou jogar ele aqui por enquanto. Vou criar um diretório chamado data. Se vocês olharem aqui no mapeamento, ele está mapeando o ponto data para o data. Então eu preciso ter um diretório plugins aqui que tenha os meus plugins. Então eu vou criar aqui dentro do data, plugins. Aí eu vou mover esse carinha aqui para o plugins. Aí eu vou extrair ele. vou mover esse carinha aqui para o plugins aí eu vou extrair ele então tem aqui kk-connect, jdbc, beleza vamos dar uma olhada nas propriedades desse conector Como eu falei em aulas anteriores, todo plugin tem uma lista de configurações. É só a gente procurar a correta. Como você deve ter percebido aqui, eu tenho qualquer classe que eu tenho que utilizar, conexão na base, etc., connection URL. Então, vamos primeiro subir o nosso cluster, já que eu já instalei o plugin, já preparei o meu Postgre, então agora eu vou dar um look com o postup, e vamos ver se vai dar tudo certo. Dando tudo certo, a gente utiliza para poder fazer conexão em base de dados. a gente se conectar aqui no Postgre então localhost a porta eu acho que a gente não colocou porta vamos ver se ele cadê o postgre eu acho que a gente vai ter que expor a porta. Vamos fazer um teste aqui. Vamos ter que expor a porta, mas é bem tranquilo. Na verdade eu só preciso vir aqui e colocar o port 54325432. E aí eu vou reiniciar aqui o meu docker, meu docker não, reiniciar o serviço então ele recriou aqui o db na porta 5432 agora tá exposta, Se a gente dá um telnet, ela conectou. Legal. Então, o usuário postgre e a senha xampple. Perfeito, conectou vou criar uma tabela aqui vou chamar de client eu prefiro criar na verdade utilizando o console mas você pode criar se preferir também utilizar o próprio a própria ideia do DataGrip então, primaryKey, name, vachar, same not know aqui está suficiente vamos dar um select aqui nessa tabela perfeito Vamos dar um SELECT aqui nessa tabela. Perfeito. Bem simples, né? Só o ID e o NAME, o ID é um inteiro e o nome é um WAFAR. Ok. Dado que o meu cluster está de pé, vamos ver se ele retorna esse info.cluster. Retornou. Agora ele deve retornar o plugin JDBC que eu instalei. Perfeito. Então ele instalou aqui para mim dois plugins, na verdade, né? Um source e um sync JDBC. Bom, agora a gente vai criar o conector JDBC que vai se comunicar com essa base Postgre, que eu acabei de criar, e com essa tabela cliente. Então, eu vou fazer um insert aqui. Ali eu vou fazer melhor. Eu vou criar o conector, tá? E aí depois eu faço um insert, vocês vão ver que ele vai coletar aquele dado e mandar para um tópico do Kafka. Tá bom? Então vamos lá. Vamos no post aqui. Eu vou chamar de source jdbc-post-grease. Vamos pegar a colinha aqui. A classe é essa aqui. então a classe é essa aqui vou colocar uma task só key converter vai ser string json o tópico vou colocar source post-degree na verdade esse aqui é um source, então não tem tópico ele mesmo, só para vocês entenderem, como ele é um source o próprio conector neste caso, ele vai escolher qual que é o tópico, enfim, usando o nome da tabela, etc como eu falei, isso depende muito do plugin, mas nesse plugin do JDBC, ele vai escolher qual que é o nome do tópico baseado no nome da tabela que você está utilizando. Então, vamos descer um pouco aqui. A gente tem o connection URL. Que vai ser... Nesse caso, eu vou copiar aqui do Oracle. Vou fazer melhor. que vai ser nesse caso, eu vou copiar aqui do Oracle vou fazer melhor, aqui se você estiver usando o data grip ele já te dá aqui o URL jdbc, tá Aí tem um detalhe aqui, como o meu Kafka Connection é dentro daquele docker-compose que está na mesma rede do DB. Então eu posso simplesmente colocar DB aqui em vez de localhost. Ele vai se comunicar dentro da própria network do docker. Não vai sair daquela rede do Docker. Usuário. Postgre. Deve ter um password, provavelmente. Example. Ok, não vou utilizar. Table white list. Aqui são as tabelas que eu quero ler. Aqui é a minha tabela client que eu acabei de criar. Blacklist se eu quisesse remover alguma no meu caso. Ok, não preciso do schema, estou pegando do public. Ok, ok. Bom, aí os modos. Como eu falei, cada conector tem sua especificidade. Então nesse caso, você vai dizer como é que você quer que ele pegue esses dados. Eu geralmente uso o modo incremental, como a gente tem uma APK que vai ficar sempre incrementando, a gente pode usar esse modo incrementing aqui. Então nesse caso ele vai, baseado no meu ID, que é 1, 2, 3, 4, 5, 6, é óbvio né, eu vou incrementar isso sequencialmente, ele vai utilizar esse ID para poder saber qual é o último dado que ele leu e qual é o próximo. incremento, já que eu estou usando esse modo de incremento, qualquer coluna, no caso é o ID, se fosse, por exemplo, incremento baseado no timestamp, eu teria que colocar o timestamp coluna M, etc, não é o caso. A query, você pode colocar uma query específica aqui para poder selecionar a registra, mas eu não vou colocar, eu vou deixar ele construir a query baseada no que eu coloquei aqui, de incremento, etc. Aqui tem o tipo de tabela que ele vai ler, então assim, o padrão é table, não vou mexer, mas poderia ser uma view, etc. Também não vou mexer no pool, então de 5 em 5 segundos ele vai bater lá na minha tabela para verificar se o registro chegou ou não. Ok. Nesse caso eu não vou mexer também, vou deixar do jeito que está. Aqui a quantidade de linhas que ele pode trazer. Se eu quiser, por exemplo, lembra que eu falei que na hora que ele for gravar no Kafka, ele vai criar um tópico. Eu posso colocar um prefixo nesse tópico se eu quiser. Então, ele vai criar aquele tópico com o prefixo que eu quero. Vou colocar aqui assim, Lanyellis. Então, ele vai colocar esse prefixo aqui em todos os tópicos timezone etc bom é isso vamos criar vamos ver se falta alguma coisa a primeira coisa que eu faço ah importante né eu preciso colocar o nome... aqui tá errado, tá pessoal? aqui é só connectors e o nome fica aqui embaixo, ok? então vamos lá ok, fiz a criação como é que eu sei que esse conector está rodando? Eu vou aqui em Status, coloco o nome do meu conector. Perfeito. O que ele me traz aqui? Que o conector está running. Lembra que eu falei que o conector é diferente da T? Então, o conector está rodando nesse nó, que é o Connect 83, e a task está rodando nesse nó. Como eu só tenho um nó, vai sempre rodar no mesmo, obviamente. Mas se eu tivesse vários nós e diversas tasks, você ia ver que o ID ia ser diferente e o nó provavelmente também vai ser diferente. E aí ele diz aqui que é um source. Beleza. Você também pode ver essa mesma informação no AKHQ, que é a interface gráfica. Então se você vier aqui nessa seta, ele vai mostrar aqui para mim que o conector está em estado de running, inclusive eu posso deletar aqui se eu quiser, ou ver as configurações aqui também. É a mesma coisa se você utilizasse via postman. Bom, vamos voltar aqui agora, não tem nenhum tópico ainda, eu vou fazer um insert. ainda eu vou fazer um insert into client idname cliente ronaldo id1 a gente abrir aqui olha só o que ele traz. Ele vai trazer aqui toda a informação de esquema, né? Então, o ID é um índice 32, tem o nome, e aqui eu tenho realmente o payload que é Ronaldo1, tá? Se eu inserir um outro plant aqui, vou colocar aqui Mike. Mesma coisa, tá? Ele vai entender que o novo cliente é o Mike e vai opa, aqui, ele vai adicionar o Mike. Então ele faz esse controle baseado sempre, como eu falei, nesse ID já que eu escolhi o modo incremental. Eu não sei se vocês lembram anteriormente, mas eu falei que o Kafka Connect tem alguns tópicos que ele utiliza para controlar offset, etc. Nesse meu caso, ele está usando o tópico de config, offset e status. Então, se eu abrir config, olha só o que ele tem. Ele tem todas as configs do meu conector. Lembra que eu falei? O tópico de config ele grava as configs do meu conector. O tópico de offset ele vai gravar exatamente onde o meu conector está. Então olha só. Como é que ele sabe que no próximo tem que ler o 3? Porque ele tem aqui no tópico de offset dizendo que ele leu o 2. E no próximo é o 3 e assim por diante. E aí o status vai mostrar como é que o meu conector está. Então, nesse caso aqui ele está running. Ok? Eu espero que vocês tenham gostado, pessoal. E até a próxima.