broker.id=0 每台服务器的broker.id都不能相同 #hostname host.name=172.18.240.36 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的连接端口 zookeeper.connect=172.18.240.36:4001 #默认partition数 num.partitions=2
#hostname host.name=172.18.240.62 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的连接端口 zookeeper.connect=172.18.240.36:4001 #默认partition数 num.partitions=2
nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test
./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning
kafka: consumer: default: server: 172.18.240.36:9092,172.18.240.62:9092 enableAutoCommit: false autoCommitIntervalMs: 100 sessionTimeoutMs: 15000 groupId: data_analysis_group autoOffsetReset: latest producer: default: server: 172.18.240.36:9092,172.18.240.62:9092 retries: 0 batchSize: 4096 lingerMs: 1 bufferMemory: 40960
package com.dtdream.analysis.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import java.util.HashMap;
import java.util.Map;
@ConfigurationProperties(
prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
private static String autoCommitIntervalMs;
private static String sessionTimeoutMs;
private static Class keyDeserializerClass = StringDeserializer.class;
private static Class valueDeserializerClass = StringDeserializer.class;
private static String groupId = "test-group";
private static String autoOffsetReset = "latest";
private static String server;
private static boolean enableAutoCommit;
public static String getServer() {
return server;
}
public static void setServer(String server) {
KafkaConsumerConfig.server = server;
}
public static boolean isEnableAutoCommit() {
return enableAutoCommit;
}
public static void setEnableAutoCommit(boolean enableAutoCommit) {
KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
}
public static String getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
}
public static String getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public static void setSessionTimeoutMs(String sessionTimeoutMs) {
KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
}
public static Class getKeyDeserializerClass() {
return keyDeserializerClass;
}
public static void setKeyDeserializerClass(Class keyDeserializerClass) {
KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
}
public static Class getValueDeserializerClass() {
return valueDeserializerClass;
}
public static void setValueDeserializerClass(Class valueDeserializerClass) {
KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
}
public static String getGroupId() {
return groupId;
}
public static void setGroupId(String groupId) {
KafkaConsumerConfig.groupId = groupId;
}
public static String getAutoOffsetReset() {
return autoOffsetReset;
}
public static void setAutoOffsetReset(String autoOffsetReset) {
KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
log.debug("partition is {},key is {},topic is {}",
consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
return false;
}
});
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
/* @Bean
public Listener listener() {
return new Listener();
}*/
}
package com.dtdream.analysis.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: chenqimiao
* Date: 2017/7/24
* Time: 9:43
* To change this template use File | Settings | File Templates.
*/
@ConfigurationProperties(
prefix = "kafka.producer.default",
ignoreInvalidFields = true
)//注入一些属性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
private static String server;
private static Integer retries;
private static Integer batchSize;
private static Integer lingerMs;
private static Integer bufferMemory;
private static Class keySerializerClass = StringSerializer.class;
private static Class valueSerializerClass = StringSerializer.class;
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
return props;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public static String getServer() {
return server;
}
public static void setServer(String server) {
KafkaProducerConfig.server = server;
}
public static Integer getRetries() {
return retries;
}
public static void setRetries(Integer retries) {
KafkaProducerConfig.retries = retries;
}
public static Integer getBatchSize() {
return batchSize;
}
public static void setBatchSize(Integer batchSize) {
KafkaProducerConfig.batchSize = batchSize;
}
public static Integer getLingerMs() {
return lingerMs;
}
public static void setLingerMs(Integer lingerMs) {
KafkaProducerConfig.lingerMs = lingerMs;
}
public static Integer getBufferMemory() {
return bufferMemory;
}
public static void setBufferMemory(Integer bufferMemory) {
KafkaProducerConfig.bufferMemory = bufferMemory;
}
public static Class getKeySerializerClass() {
return keySerializerClass;
}
public static void setKeySerializerClass(Class keySerializerClass) {
KafkaProducerConfig.keySerializerClass = keySerializerClass;
}
public static Class getValueSerializerClass() {
return valueSerializerClass;
}
public static void setValueSerializerClass(Class valueSerializerClass) {
KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
}
@Bean(name = "kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping(
value = "/openApp",
method = RequestMethod.POST,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
)
@ResponseBody
public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {
logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));
String ip = (String) request.getAttribute("ip");
activeLogPushBo.setIp(ip);
activeLogPushBo.setDate(new Date());
//ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);
kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));
// logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));
return new ResultDTO().success();
}
package com.dtdream.analysis.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Optional;
@Component
public class Listener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {"test-topic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("message is {} ", message);
}
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有