在数字广告生态系统中,广告位的请求和上报过程中IP地址的一致性检测是一个重要的质量监控指标。IP不一致可能意味着存在代理、VPN或欺诈行为,对广告主和媒体方都会造成损失。本文将详细介绍如何设计并实现一个基于Kafka批量消费的IP对比分析系统,能够高效处理海量请求数据,并生成详细的IP一致性分析报告。
本系统采用生产者-消费者模式,整体架构分为三个主要部分:
首先定义IP消息的数据结构:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IpMessage {
private String requestId; // 唯一请求ID
private String channelSlotId; // 渠道广告位ID
private String mediaSlotId; // 媒体广告位ID
private String requestIp; // 请求IP
private String reportIp; // 上报IP
private long timestamp; // 时间戳
}分析结果的数据结构:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SlotAnalysisResult {
private String channelSlotId; // 渠道广告位ID
private String mediaSlotId; // 媒体广告位ID
private String analysisHour; // 分析小时
private int totalCount; // IP总数
private int consistentCount; // IP一致数
private int inconsistentCount; // IP不一致数
private List<String> inconsistentExamples; // 不一致案例
private List<String> inconsistentRegionExamples; // 不一致地域对比
private String mostFrequentIp; // 出现最多次数的IP
private int mostFrequentCount; // 最频繁IP的出现次数
}为实现高效批量消费,需要特别配置Kafka消费者:
@Configuration
@EnableScheduling
public class KafkaBatchConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> batchConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ip-analysis-batch-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); // 批量拉取最大记录数
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待时间
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(batchConsumerFactory());
factory.setBatchListener(true); // 开启批量监听
return factory;
}
}核心的批量消费逻辑负责从Kafka拉取指定时间段的数据:
@Component
public class KafkaBatchConsumer {
private static final String TOPIC = "ip-collection-topic";
private static final long POLL_TIMEOUT = 1000; // 1秒
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Autowired
private IpAnalysisService ipAnalysisService;
/**
* 批量消费指定小时的数据
*/
public void consumeHourData(String targetHour) {
Consumer<String, String> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
try {
Map<String, List<IpMessage>> slotMessagesMap = new HashMap<>();
boolean hasMoreData = true;
int totalMessages = 0;
while (hasMoreData) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
if (records.isEmpty()) {
hasMoreData = false;
continue;
}
for (ConsumerRecord<String, String> record : records) {
IpMessage message = parseMessage(record.value());
// 只处理目标小时的数据
String messageHour = formatHour(message.getTimestamp());
if (targetHour.equals(messageHour)) {
String slotKey = message.getChannelSlotId() + ":" + message.getMediaSlotId();
slotMessagesMap.computeIfAbsent(slotKey, k -> new ArrayList<>()).add(message);
totalMessages++;
}
// 如果单个广告位达到10000条,提前处理
processFullSlots(slotMessagesMap, targetHour);
}
// 手动提交偏移量
consumer.commitSync();
// 安全限制:最多处理100万条消息
if (totalMessages > 1000000) {
break;
}
}
// 处理剩余的消息
processRemainingSlots(slotMessagesMap, targetHour);
} finally {
consumer.close();
}
}
// 处理已满10000条的广告位数据
private void processFullSlots(Map<String, List<IpMessage>> slotMessagesMap, String targetHour) {
Iterator<Map.Entry<String, List<IpMessage>>> iterator = slotMessagesMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, List<IpMessage>> entry = iterator.next();
if (entry.getValue().size() >= 10000) {
analyzeAndSaveSlot(entry.getKey(), entry.getValue(), targetHour);
iterator.remove();
}
}
}
// 解析消息内容
private IpMessage parseMessage(String messageJson) {
try {
return JSON.parseObject(messageJson, IpMessage.class);
} catch (Exception e) {
// 解析失败的消息处理
return null;
}
}
// 格式化时间戳为小时字符串
private String formatHour(long timestamp) {
return Instant.ofEpochMilli(timestamp)
.atZone(ZoneId.systemDefault())
.format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
}
}分析服务负责核心的业务逻辑,包括IP对比、频率统计和地域分析:
@Service
public class IpAnalysisService {
@Autowired
private IpRegionService ipRegionService;
@Autowired
private IpAnalysisResultRepository resultRepository;
public SlotAnalysisResult analyzeSlotMessages(String channelSlotId, String mediaSlotId,
List<IpMessage> messages, String analysisHour) {
int totalCount = messages.size();
int consistentCount = 0;
int inconsistentCount = 0;
List<String> inconsistentExamples = new ArrayList<>();
List<String> inconsistentRegionExamples = new ArrayList<>();
Map<String, Integer> ipFrequency = new HashMap<>();
for (IpMessage message : messages) {
if (message == null) continue;
// 统计IP频率
countIpFrequency(ipFrequency, message.getRequestIp());
countIpFrequency(ipFrequency, message.getReportIp());
if (message.getRequestIp().equals(message.getReportIp())) {
consistentCount++;
} else {
inconsistentCount++;
// 记录不一致案例(最多记录100个)
if (inconsistentExamples.size() < 100) {
inconsistentExamples.add(message.getRequestIp() + ":" + message.getReportIp());
// 获取IP地域信息
String requestRegion = ipRegionService.getIpRegion(message.getRequestIp());
String reportRegion = ipRegionService.getIpRegion(message.getReportIp());
inconsistentRegionExamples.add(requestRegion + ":" + reportRegion);
}
}
}
// 找出出现次数最多的IP
Map.Entry<String, Integer> mostFrequent = findMostFrequentIp(ipFrequency);
return new SlotAnalysisResult(
channelSlotId, mediaSlotId, analysisHour, totalCount, consistentCount, inconsistentCount,
inconsistentExamples, inconsistentRegionExamples,
mostFrequent != null ? mostFrequent.getKey() : null,
mostFrequent != null ? mostFrequent.getValue() : 0
);
}
// IP频率统计
private void countIpFrequency(Map<String, Integer> frequencyMap, String ip) {
if (ip != null && !ip.trim().isEmpty()) {
frequencyMap.put(ip, frequencyMap.getOrDefault(ip, 0) + 1);
}
}
// 查找最频繁的IP
private Map.Entry<String, Integer> findMostFrequentIp(Map<String, Integer> frequencyMap) {
return frequencyMap.entrySet().stream()
.max(Map.Entry.comparingByValue())
.orElse(null);
}
// 保存分析结果到数据库
@Transactional
public void saveAnalysisResult(SlotAnalysisResult result) {
IpAnalysisResult entity = new IpAnalysisResult();
entity.setChannelSlotId(result.getChannelSlotId());
entity.setMediaSlotId(result.getMediaSlotId());
entity.setAnalysisHour(result.getAnalysisHour());
entity.setTotalCount(result.getTotalCount());
entity.setConsistentCount(result.getConsistentCount());
entity.setInconsistentCount(result.getInconsistentCount());
entity.setInconsistentExamples(JSON.toJSONString(result.getInconsistentExamples()));
entity.setInconsistentRegionExamples(JSON.toJSONString(result.getInconsistentRegionExamples()));
entity.setMostFrequentIp(result.getMostFrequentIp());
entity.setMostFrequentCount(result.getMostFrequentCount());
entity.setCreateTime(new Date());
resultRepository.save(entity);
}
}使用Spring Scheduler实现定时任务,每小时的第5分钟触发上一个小时的数据分析:
@Component
public class IpAnalysisScheduler {
@Autowired
private KafkaBatchConsumer kafkaBatchConsumer;
// 每小时的第5分钟执行,处理上一个小时的数据
@Scheduled(cron = "0 5 * * * *")
public void analyzePreviousHour() {
String previousHour = LocalDateTime.now().minusHours(1)
.format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
log.info("开始分析 {} 小时的数据", previousHour);
try {
kafkaBatchConsumer.consumeHourData(previousHour);
log.info("完成分析 {} 小时的数据", previousHour);
} catch (Exception e) {
log.error("分析 {} 小时数据时发生错误", previousHour, e);
}
}
}分析结果的数据表结构设计:
@Entity
@Table(name = "ip_analysis_result", indexes = {
@Index(name = "idx_slot_hour", columnList = "channelSlotId,mediaSlotId,analysisHour"),
@Index(name = "idx_hour", columnList = "analysisHour")
})
@Data
public class IpAnalysisResult {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "channel_slot_id", length = 100)
private String channelSlotId;
@Column(name = "media_slot_id", length = 100)
private String mediaSlotId;
@Column(name = "analysis_hour", length = 10)
private String analysisHour;
@Column(name = "total_count")
private Integer totalCount;
@Column(name = "consistent_count")
private Integer consistentCount;
@Column(name = "inconsistent_count")
private Integer inconsistentCount;
@Column(name = "inconsistent_examples", length = 4000)
private String inconsistentExamples;
@Column(name = "inconsistent_region_examples", length = 4000)
private String inconsistentRegionExamples;
@Column(name = "most_frequent_ip", length = 50)
private String mostFrequentIp;
@Column(name = "most_frequent_count")
private Integer mostFrequentCount;
@Column(name = "create_time")
private Date createTime;
}MAX_POLL_RECORDS实现大批量数据拉取,减少网络开销本系统在实际生产环境中表现出色,能够:
本文详细介绍了基于Kafka批量消费的IP对比分析系统的设计与实现。通过合理的架构设计、批量处理机制和优化策略,系统能够高效处理海量广告位请求数据,提供准确的IP一致性分析结果。这种设计方案不仅适用于IP对比分析,也可以扩展到其他需要批量处理实时数据的场景中。
系统的核心优势在于直接使用Kafka批量消费,避免了中间存储环节,减少了系统复杂度和延迟。通过定时任务触发分析,能够保证数据的完整性和分析的准确性。这种架构在保证高性能的同时,也具备良好的可扩展性和可靠性。