publish subscribe based messaging system
Note: RabbitMQ & ActiveMQ are point to point messaging system. In traditional queue each message wil go to only one consumer.
Advantages:
Feature | Kafka | RabbitMQ |
---|---|---|
Architecture | Distributed streaming platform, publish-subscribe | Traditional message broker, broker-centric model |
Message Model | Messages stored in topics, offset-based consumption | Messages sent to queues via exchanges, various routing mechanisms |
Performance | High throughput, suitable for real-time streaming | Low-latency delivery, suitable for complex routing |
Persistence | Messages persisted to disk with configurable retention | Messages can be persisted, but focus on delivery |
Scalability | Easy horizontal scaling with partitions | Scalable but more complex, involves clustering and federation |
Ordering | Strong ordering within partitions | Weaker ordering guarantees, especially in clusters |
Msg Retention | policy based retain only for 30 days | msg removed once acknowledged or processed |
Kafka Core APIs:
zookeeper-server-start.bat config\zookeeper.properties
start kafka server
kafka-server-start.bat config\server.properties
create topics with 3 partition, 1 replication
kafka-topics --zookeeper 127.0.01:2181 --topic second_topic --create --partitions 3 --replication 1
kafka-topics --zookeeper 127.0.0.1:2181 --list
detail info of given topic
kafka-topics --zookeeper 127.0.0.1:2181 --topic first_topic --describe
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
create consumer get all previous messages from beginning
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning
consumers of same groupname will share messages from topic
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my_first_app
kafka-consumer-groups --bootstrap-server localhost:9092 --list
kafka-consumer-groups --bootstrap-server localhost:9092 --group my_first_app --describe
kafka-consumer-groups --bootstrap-server localhost:9092 --group mygroup --reset-offsets --topic first_topic --to-earliest --execute
starter project - spring-web, spring-kafka
@Configuration
@EnableKafka
public class KafkaConsumerConfig{
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configMap);
}
}
@Component
public class KafkaConsumer{
@KafkaListener(topic="helloTopic", groupId="consumer-group")
public void consumer(String message){
System.out.println(message);
}
}
@RestController
@RequestMapping("/produce/{message}")
public class HomeController{
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping
public String publish(@PathVariable("message") String message){
KafkaTemplate.send("helloTopic", message)
return "Message published: "+ message;
}
}
public class kafkaConfig{
@Bean
public kafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory())
}
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaProducerFactory(configMap);
}
}