//发送的第一步是添加到需要发送的列表中。
protected void setWaitingToEmit(ConsumerRecordsK, V ConsumerRecords) {
for (TopicPartition tp :consumerRecords.partitions()) {
waitingToEmit.put(tp, new ArrayList(consumerRecords.records(tp)));
}
}
受保护的无效emitIfWaitingNotEmited(){
//从waitingToEmit列表中获取要发送的事件并开始发送
IteratorListConsumerRecordK, V waitingToEmitIter=waitingToEmit.values().iterator();
LOG.info('真实事件发出开始');
外环:
while (waitingToEmitIter.hasNext()) {
ListConsumerRecordK, V waitingToEmitForTp=waitingToEmitIter.next();
while (!waitingToEmitForTp.isEmpty()) {
最终布尔值已发射Tuple=emitOrRetryTuple(waitingToEmitForTp.remove(0));
如果(发射元组){
LOG.error('事件发出失败');
打破外循环;
}
}
waitingToEmitIter.remove();
}
}
//实际发送过程
私人布尔emitOrRetryTuple(ConsumerRecordK,V记录){
最终TopicPartition tp=new TopicPartition(record.topic(), record.partition());
最终KafkaSpoutMessageId msgId=retryService.getMessageId(record);
if (offsetManagers.containsKey(tp) offsetManagers.get(tp).contains(msgId)) { //已被确认
LOG.trace('记录[{}] 的元组已被确认。正在跳过', record);
} else if (emissed.contains(msgId)) { //已发出并且正在等待确认或失败
LOG.trace('记录[{}] 的元组已发出。正在跳过', record);
} 别的{
最终OffsetAndMetadata commitOffset=kafkaConsumer.commissed(tp);
如果(isAtLeastOnceProcessing()
已提交偏移!=null
commitOffset.offset() record.offset()
commitMetadataManager.isOffsetCommitedByThisTopology(tp,commitedOffset,Collections.unmodifyingMap(offsetManagers))){
//确保在启动具有此id 的拓扑后,消费者会获取
//位置永远不会落后于提交的偏移量(STORM-2844)
throw new IllegalStateException('尝试发出已提交的消息。'
+ '当使用至少一次处理保证时,永远不会发生这种情况。');
}
最终ListObject元组=kafkaSpoutConfig.getTranslator().apply(record);
如果(isEmitTuple(元组)){
最终布尔isScheduled=retryService.isScheduled(msgId);
//未调度=从未失败(即从未发出),或已调度并准备好重试
if (!isScheduled || retryService.isReady(msgId)) {
最终字符串流=KafkaTuple 的元组实例? ((KafkaTuple)元组).getStream() : Utils.DEFAULT_STREAM_ID;
if (!isAtLeastOnceProcessing()) {
如果(kafkaSpoutConfig.isTupleTrackingEnforced()){
收集器.emit(流,元组,msgId);
LOG.trace('使用msgId [{}] 为记录[{}] 发出元组[{}]', tuple, record, msgId);
} 别的{
收集器.emit(流,元组);
LOG.trace('为记录[{}] 发出元组[{}]', tuple, record);
}
} 别的{
发射。添加(msgId);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { //已安排重试并重新发出,因此从计划中删除。
retryService.remove(msgId);
}
收集器.emit(流,元组,msgId);
tupleListener.onEmit(tuple, msgId);
LOG.trace('使用msgId [{}] 为记录[{}] 发出元组[{}]', tuple, record, msgId);
}
返回真;
}
} 别的{
/*如果空元组未配置为发出,则应将其标记为已发出并立即确认
* 允许将其偏移量提交给Kafka*/
LOG.debug('不按照配置中的定义发出记录[{}] 的空元组。', record);
如果(isAtLeastOnceProcessing()){
msgId.setNullTuple(true);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
确认(消息ID);
}
}
}
返回假;
}
标题:Kafka与Storm的集成
链接:https://www.gbbxw.com/news/rj/20178.html
版权:文章转载自网络,如有侵权,请联系删除!
用户评论
这款名为“Kafka和Storm的整合”的游戏简直是文学与冒险的完美融合,给玩家带来了一次沉浸式的体验。
有20位网友表示赞同!
"Kafka和Storm的整合"不仅是一款游戏,更像是一个故事的重新演绎,让玩家深入感受其神秘的魅力。
有6位网友表示赞同!
在“Kafka和Storm的整合”中你能够感受到独特的叙事艺术和紧张的冒险元素巧妙结合,非常令人着迷!
有7位网友表示赞同!
这款游戏以其创新的理念和对经典作品Kafka和风暴的精彩诠释赢得了我的喜爱。特别是它的故事情节,让人难以忘怀。
有5位网友表示赞同!
如果你是Kafka或Storm粉丝或者是热爱探险的人,“Kafka和Storm整合”将是一个让你无法放下的游戏。
有19位网友表示赞同!
"Kafka和Storm的整合"成功地将两者的特色融入在一起,为玩家提供了一个既新颖又充满深度的游戏环境。
有5位网友表示赞同!
从视觉到声音,这款游戏在每个细节上都做得很好,让人仿佛置身于一个充满想象的世界中。
有12位网友表示赞同!
“Kafka和Storm的整合”游戏不仅故事引人入胜,其角色设计也很出彩,每一步都能感受到精良制作。
有18位网友表示赞同!
"Kafka"和"Storm"的结合在这款游戏中展现得淋漓尽致,不仅好玩刺激,更带有一种深刻的艺术情感体验。
有17位网友表示赞同!
在“Kafka和Storm整合”中探索未知的世界,每个角落都充满了惊喜。特别推荐给喜欢冒险和深度游戏剧情的玩家。
有14位网友表示赞同!
"Kafka和Storm"的融合给人一个全新的视角看待世界,在这款游戏中我发现了很多细节之中的美好,令人回味无穷。
有16位网友表示赞同!
这款游戏让人感觉既古老又充满现代感,"Kafka和Storm的整合"是一个完美的结合体,值得一试!
有20位网友表示赞同!
玩家能够通过“Kafka和Storm整合”游戏深入探索故事背景下的隐秘角落,体验了一次独特的冒险之旅。
有8位网友表示赞同!
"Kafka和Storm"两者元素在这款游戏中完美融合,不仅故事情节吸引人,操作上也很流畅自然。
有12位网友表示赞同!
无论你是小说爱好者还是游戏狂热者,“Kafka和Storm的整合”都是你不能错过的杰作。沉浸其中,探索无尽的世界吧!
有5位网友表示赞同!
这个游戏通过“Kafka”的阴郁与“Storm”的壮观,在虚拟世界中创造了一场视觉和情感上的盛宴。
有9位网友表示赞同!
"Kafka"的世界结合了"Storm"的狂暴,使得“Kafka和Storm整合”成为一个不可多得的游戏体验。强烈推荐给喜欢挑战与新奇元素的玩家。
有9位网友表示赞同!
深入这款游戏可以感受到作者对细节的关注以及深邃的艺术创作力。无论是情节还是角色设计,“Kafka和Storm的整合”都是精品之作。
有6位网友表示赞同!
"Kafka"的魅力加上“Storm”的激情,这款整合游戏在艺术和技术上都展现了卓越水准,是一次令人震撼的游戏旅程。
有10位网友表示赞同!