Apache Kafka, a evolução do stream de dados: mãos na massa!
Em nosso post anterior sobre o Apache Kafka falamos sobre sua utilidade, objetivos e funcionamento. Agora vamos botar a mão na massa e ver como colocamos uma aplicação básica no ar. Os códigos apresentados abaixo fazem parte de um teste completo que tem o seguinte formato:

Praticando as funcionalidades do Apache Kafka
Subindo e executando o Kafka
Para começar, recomendamos baixar o Kafka que está disponível no site em formato ZIP. Este arquivo contém todos os scripts para executar o Kafka localmente. Também existe uma versão famosa dockerizada do Kafka feita pelo Spotify.
Para subir, é necessário rodar uma instância do Zookeeper, que acompanha o Kafka. Então, deve-se criar os tópicos informando o nome, a quantidade de replicações e de partições que cada tópico terá. Para isso, basta executar os seguintes comandos:
// Subir o zookeeper com suas configs default
bin/zookeeper-server-start.sh config/zookeeper.properties
// Subir o zookeeper com suas configs default
bin/kafka-server-start.sh config/server.properties
//
Criar os tópicos do kafka
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic kafka_topic
// List os topicos existentes no kafka
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
O último comando exibe todos os tópicos criados no Kafka, se parar o Kafka e subir novamente, os tópicos continuarão no ar e ele continuará a distribuir as mensagens de onde parou. Então, para enviar ou receber uma mensagem, basta subir um consumer ou producer. Para fazer isso também são disponibilizados os seguintes scripts:
// Para produzir uma mensagem bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic // Para consumir uma mensagem bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --from-beginning
PS: O comando “–from-beginning” faz com que o consumidor inicie a leitura do tópico a partir do offset 0 ao subir.
Kafka Publish and Subscriber
Inicialmente precisamos importar o cliente que o próprio Apache Kafka fornece, usando o Maven (arquivo pom.xml):
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> </dependencies>
O programa, em Java, abaixo, é de um producer que gera uma mensagem em Json:
package myapps; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Random; public class Producer { private static String topico = "streams-all-messages"; private static String mensage = "{\"type\":\"adr\",\"content\":{“...”}}"; public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // IP e Porta do broker bootstrap properties.put("bootstrap.servers", "localhost:9092"); // Id do producer que esta se connectando no broker properties.put("client.id", "cliente.1"); // A chave será serializada para Long properties.put("key.serializer", LongSerializer.class.getName()); // O valor será serializado para String properties.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String,String> producer = new KafkaProducer<>(properties); System.out.println("Send on topic " + topico + " the message: " + mensage); ProducerRecord<String,String> record = new ProducerRecord<>(topico, mensage); producer.send(record); } }
Já o modelo abaixo é de um Consumer que recebe quatro mensagens diferentes (sendo duas com dados em string e duas com dados em long):
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Consumer { private static List<String> topics_type_1 = Arrays.asList("streams-all-messages", "streams-only-addresses", "streams-updated-addresses"); private static List<String> topics_type_2 = Arrays.asList("streams-count-updated-addresses"); public static void main(String[] args) { Properties properties = new Properties(); // IP e Porta do broker bootstrap properties.setProperty("bootstrap.servers", "localhost:9092"); // Id do grupo do tópico que esta se conectando no broker properties.setProperty("group.id", "group.1"); // Id do producer que esta se conectando no broker properties.setProperty("client.id", "cliente.1"); // Define se o commit das mensagens lidas vai ser automático properties.setProperty("enable.auto.commit", "true"); // Tempo maximo de espera do commit até ser considerado falha properties.setProperty("auto.commit.interval.ms", "5000"); // Pega o offset mais recente ao se conectar com o kafka properties.setProperty("auto.offset.reset", "earliest"); // Máximo de mensagem que irá ser pega por vez (lote) properties.setProperty("max.poll.records", "500"); // Tempo do heartbeat que o componente irá enviar ao kafka properties.setProperty("heartbeat.interval.ms", "1000"); // Espera do heartbeat do componente até ser considerado falha properties.setProperty("session.timeout.ms", "10000"); KafkaConsumer<String, String> consumerType1 = getConsumer(properties, topics_type_1, StringDeserializer.class.getName()); KafkaConsumer<String, Long> consumerType2 = getConsumer(properties, topics_type_2, LongDeserializer.class.getName()); try { while (true) { consumerType1.poll(Duration.ofSeconds(10)).forEach(record -> { System.out.println(record.topic()+": "+record.key()+" - "+record.value()); }); consumerType2.poll(Duration.ofSeconds(10)).forEach(record -> { System.out.println(record.topic()+": "+record.key()+" - "+record.value()); }); } } finally { consumerType1.close(); consumerType2.close(); } } private static <T> KafkaConsumer<String, T> getConsumer(Properties properties, List<String> topics_type, String valueDeserializer) { // A chave será serializada para Long properties.setProperty("key.deserializer", StringDeserializer.class.getName()); // O valor será serializado para String properties.setProperty("value.deserializer", valueDeserializer); KafkaConsumer<String, T> consumer = new KafkaConsumer<>(properties); consumer.subscribe(topics_type); return consumer; } }
Kafka Streams
Tanto o Kafka Streams como Kafka Tables trabalham com o conceito de Topologia. O stream nada mais é que a execução das topologias informadas. Ao gerar um stream ou table, ele retorna a classe Topology, que será executada (da mesma forma que os predicados no próprio Stream do Java).
Inicialmente deve-se importar a biblioteca do Kafka Streams usando o Maven:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.0</version> </dependency> </dependencies>
Abaixo, mais um modelo de programa em Java, que é um Stream que “ouve” o tópico “streams-all-messages” e filtra a mensagem apenas quando ela for do tipo “Adr” (Address):
import com.google.gson.Gson; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import java.util.Properties; import java.util.concurrent.CountDownLatch; import static java.util.Objects.nonNull; public class FilterAddresses { public static void main(String[] args) throws Exception { FilterAddresses filterAddresses = new FilterAddresses(); filterAddresses.run(); } private void run() { Properties properties = getProperties(); Topology topology = getTopology(); KafkaStreams streams = new KafkaStreams(topology, properties); CountDownLatch latch = new CountDownLatch(1); shutdown(streams, latch); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } private Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder.<String, String>stream("streams-all-messages") .map((key, value) -> KeyValue.pair(key, parseAsMessage(value))) .filter((key, value) -> nonNull(value) && value.isAddress()) .map((key,value) -> KeyValue.pair(key,parseAsString(value.getContent()))) .to("streams-only-addresses"); return builder.build(); } private Message parseAsMessage(String value) { try { return (new Gson()).fromJson(value, Message.class); } catch (Exception ex) { System.out.println(ex.getMessage()); return null; } } private String parseAsString(UpdateAddresses.Message value) { try { return (new Gson()).toJson(value); } catch (Exception ex) { System.out.println(ex.getMessage()); return null; } } private Properties getProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.setProperty("group.id", "group.2"); return properties; } private void shutdown(KafkaStreams streams, CountDownLatch latch) { Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); } class Message { String type; UpdateAddresses.Message content; public boolean isAddress() { return MsgType.ADDRESS.getValue().equals(type); } public UpdateAddresses.Message getContent() { return content; } } enum MsgType { ADDRESS("adr"); String type; MsgType(String type) { this.type = type; } String getValue() { return type; } } }
Kafka Tables
O Kafka tables faz parte do Streams Framework, assim, basta importar o mesmo framework, conforme foi feito anteriormente via Maven.
O exemplo a seguir é um stream que processa a mensagem sumarizando e separando os endereços que foram filtrados no stream acima, tornando eles uma Kafka Table:
import com.google.gson.Gson; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; import static java.util.Objects.nonNull; public class UpdateAddresses { public static void main(String[] args) throws Exception { UpdateAddresses updateAddresses = new UpdateAddresses(); updateAddresses.run(); } private void run() { Properties properties = getProperties(); Topology topology = getTopology(); KafkaStreams streams = new KafkaStreams(topology, properties); CountDownLatch latch = new CountDownLatch(1); shutdown(streams, latch); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); } } private Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("streams-only-addresses"); KGroupedStream<String, String> result = stream .map((key, value) -> KeyValue.pair(key,parseAsMessage(value))) .filter((key, value) -> nonNull(value)) .selectKey((keu, value) -> value.getPerson()) .map((key, value) -> KeyValue.pair(key,parseAsString(value))) .groupByKey(); KTable<String, String> address = result.reduce((key, value) -> value); KTable<String, Long> changes = result.count(); ] address.toStream().to("streams-updated-addresses", Produced.with(Serdes.String(), Serdes.String())); changes.toStream().to("streams-count-updated-addresses", Produced.with(Serdes.String(), Serdes.Long())); return builder.build(); } private Message parseAsMessage(String value) { try { return (new Gson()).fromJson(value, Message.class); } catch (Exception ex) { System.out.println("parseAsMessage: " + ex.getMessage()); System.out.println(ex.getStackTrace()); return null; } } private String parseAsString(Message value) { try { return (new Gson()).toJson(value); } catch (Exception ex) { System.out.println("parseAsString: " + ex.getMessage()); System.out.println(ex.getStackTrace()); return null; } } private Properties getProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount2"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty("group.id", "group.2"); return properties; } private static void shutdown(KafkaStreams streams, CountDownLatch latch) { Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); } class Message { String person; String street; Integer number; String neighborhood; String city; String state; String country; String zipcode; String getPerson() { return person; } } }
Conclusões
O Apache Kafka é uma ferramenta incrível, e além de extremamente performática, é segura e resiliente. São várias as empresas que utilizam o Kafka, e no geral, são empresas que necessitam de extrema performance em suas aplicações, como Netflix, Spotify e no próprio AllowMe, aqui na Tempest.