🏆作者简介,普修罗双战士,一直追求不断学习和成长,在技术的道路上持续探索和实践。
🏆多年互联网行业从业经验,历任核心研发工程师,项目技术负责人。
🎉欢迎 👍点赞?评论?收藏
大数据知识专栏学习
大数据知识云集 | 访问地址 | 备注 |
---|---|---|
大数据知识点(1) | https://blog.csdn.net/m0_50308467/article/details/134989969 | 大数据专栏 |
大数据知识点(2) | https://blog.csdn.net/m0_50308467/article/details/135109787 | 大数据专栏 |
大数据知识点(3) | https://blog.csdn.net/m0_50308467/article/details/135164698 | 大数据专栏 |
大数据知识点(4) | https://blog.csdn.net/m0_50308467/article/details/135164812 | 大数据专栏 |
大数据知识点(5) | https://blog.csdn.net/m0_50308467/article/details/135164812 | 大数据专栏 |
大数据知识点(6) | https://blog.csdn.net/m0_50308467/article/details/135313184 | 大数据专栏 |
大数据知识点(7) | https://blog.csdn.net/m0_50308467/article/details/135322179 | 大数据专栏 |
大数据知识点(8) | https://blog.csdn.net/m0_50308467/article/details/135323118 | 大数据专栏 |
大数据知识点(9) | https://blog.csdn.net/m0_50308467/article/details/135354622 | 大数据专栏 |
大数据知识点(10) | https://blog.csdn.net/m0_50308467/article/details/135366864 | 大数据专栏 |
大数据知识点(11) | https://blog.csdn.net/m0_50308467/article/details/135402222 | 大数据专栏 |
MapReduce是一种用于处理大规模数据集的编程模型和算法,是由Google提出并实现的。它的核心思想是分而治之,将大规模数据集分成多个小数据集,分配给多台计算机进行计算,并将计算结果合并得到最终的结果。MapReduce框架主要由两个步骤组成:Map和Reduce。
Map负责将输入数据切分成小的数据块,然后将每个数据块交给不同的计算节点进行处理,生成键值对形式的中间结果文件。
Reduce负责将中间结果文件按照键值归并,并进行聚合操作,生成最终的结果文件。
MapReduce工作流程如下:
输入数据的切分:首先将大规模数据集切分成多个小数据块,每个小数据块的大小通常为64MB或128MB。
Map阶段的并行计算:将每个小数据块交给不同的计算节点进行处理,使用Map函数处理数据,对于每个数据块,Map过程会生成一组键值对中间结果文件。
Shuffle阶段的数据分布:将中间结果文件按照键值进行分组,并将分组好的结果文件传递给不同的Reduce节点,以便后续的Reduce操作。
Reduce阶段的并行计算:Reduce节点对分组好的结果文件进行处理,使用Reduce函数进行聚合操作,输出最终的结果文件。
输出结果的合并:将所有Reduce节点输出的结果文件合并,得到最终的结果集。
MapReduce框架主要适用于一类大规模数据处理问题,如数据挖掘、搜索引擎索引和分布式日志分析等,因为它充分利用了分布式计算的优势,可以在分布式计算集群中高效地处理 PB 级别的数据。
在全分布式环境下,有多台计算机组成的集群,需要进行计算任务的分发、调度和结果收集等操作。为了简化操作并提高安全性,通常需要实现password-less SSH(无密码SSH)的配置。
简化操作:在全分布式环境中,需要频繁地进行计算节点之间的通信和操作。如果没有password-less SSH,每次进行SSH连接时都需要输入密码,会增加操作的繁琐程度和时间消耗。使用password-less SSH后,可以在不输入密码的情况下快速建立SSH连接,简化了操作流程。
提高效率:在分布式环境中,需要频繁进行大规模数据处理和计算任务的分发和调度。使用password-less SSH可以减少SSH连接的建立时间,提高任务分发的效率,以及减少任务处理的延迟。
加强安全性:虽然password-less SSH可以省去每次输入密码的步骤,但这并不意味着降低了安全性。相反,在正确配置的情况下,password-less SSH可以提供更高的安全性。它使用公钥-私钥加密和验证机制,不仅可以保证进行SSH连接的双方身份的验证,还可以防止密码的泄露和暴力破解攻击。
在实现password-less SSH时,通常会使用SSH密钥对,包括公钥和私钥。公钥存储在需要进行SSH连接的计算节点上,私钥存储在管理节点上。在建立SSH连接时,使用私钥进行认证和加密,完成身份验证,从而实现无密码的SSH连接。这种配置提高了操作效率,同时保证了通信的安全性。
Hadoop是一个开源的分布式计算框架,由Apache基金会开发和维护,它的出现是为了解决处理大规模数据的需求和问题。
在过去的几十年中,数据量呈指数级增长,传统的数据处理技术和工具已经无法高效地处理大规模数据。传统的数据处理方法通常是基于单台计算机进行的,随着数据量的增加,面临着存储、处理速度、可靠性等方面的挑战。
Hadoop的出现主要有以下原因:
面向大数据处理:Hadoop的设计目标是面向大规模数据处理,它能够解决PB级别的数据存储和处理问题。通过将数据分布在多个计算节点上进行并行处理,可以加速数据处理速度,提高整体系统的扩展性和容错性。
分布式计算的优势:Hadoop利用分布式计算的优势,将大规模数据分成多个小数据块,分配给多台计算机进行处理,利用多台计算机的并行计算能力来加速数据处理速度。同时,分布式计算还提供了容错和高可用性的特性,某个节点故障时,任务可以在其他节点上继续进行。
容错性和可扩展性:Hadoop具有良好的容错性和可扩展性。它可以在集群中自动备份数据,确保数据的可靠性和冗余性。同时,当需要增加数据存储和计算能力时,可以简单地增加计算节点,无需对整个系统进行重建。
生态系统的丰富性:Hadoop生态系统极为丰富,包括HDFS(Hadoop分布式文件系统)用于数据存储,MapReduce用于数据处理,以及HBase、Hive、Spark、Pig等一系列数据处理和分析工具。这些工具和组件为大数据处理提供了丰富的选择和灵活性。
总之,Hadoop的出现是为了满足处理大规模数据的需求,通过分布式计算、容错性和可扩展性等特性,解决了传统数据处理方法面临的挑战,成为目前最为广泛使用的大数据处理框架之一。
Hadoop是一个分布式系统,它由很多不同的进程组成,每个进程都扮演着不同的角色。以下是Hadoop集群中常用的守护进程:
NameNode:这是Hadoop文件系统的主节点,负责管理文件系统的命名空间和用户对文件的访问权限。如果NameNode宕机,整个文件系统将无法正常工作。
DataNode:数据节点是文件系统的工作节点,负责实际存储和管理文件的内容。每个数据节点上的文件数据会被复制到多个数据节点上,从而确保整个文件系统的可靠性和安全性。
ResourceManager:资源管理器是YARN调度模块的主节点,负责对整个集群的计算资源进行管理和分配,以便运行分布式应用程序。
NodeManager:节点管理器是YARN调度模块的工作节点,负责在集群中管理节点资源,以便后续任务能够在其上运行。
JobTracker:JobTracker是旧版的Hadoop调度模块,它负责管理MapReduce作业的执行和调度。在新版本的Hadoop中,JobTracker已被YARN ResourceManager和ApplicationMaster所取代。
TaskTracker:任务跟踪器是旧版的Hadoop调度模块的工作节点,负责在节点上执行任务,并将其报告给JobTracker。在新版本的Hadoop中,TaskTracker已被YARN NodeManager所取代。
除了以上常用的守护进程之外,Hadoop还有一些其他的守护进程,如Secondary NameNode、JournalNode、ZooKeeper等,它们分别用于备份NameNode的元数据、提供日志服务和提供分布式协作服务等。
总之,Hadoop集群中的守护进程根据其功能和作用不同,可以分为文件系统节点、作业调度节点和协调服务节点等多个类型,这些进程通常需要配合使用,以实现整个Hadoop集群的高效、可靠和安全的运行。
HBase是一个分布式的列存储数据库,其读写流程可以简单概括为以下几个步骤:
写入流程:
读取流程:
需要注意的是,HBase是分布式的,存储的数据根据行键进行水平切分,每个RegionServer负责一个或多个Region的管理和存储。读取和写入流程中的一些细节步骤可能会因具体的配置、负载均衡等因素而略有不同,但以上概述了HBase的简单读写流程。
Watermark是流数据处理中的一个重要概念,它的作用是用于定义数据流的时间边界,帮助系统判断哪些数据可以被认为是“处理完整”的。
在流数据处理中,数据是持续不断地产生的,而处理过程中可能会出现延迟、乱序等情况。Watermark的作用就是为了解决这些问题,保证数据的准确性和完整性。
Watermark一般以事件时间为基准,可以理解为事件时间的截止点。当一个数据元素的事件时间超过了Watermark时,系统将认为该数据在该Watermark之前已经全部到达。
Watermark的作用主要有以下几个方面:
乱序数据处理:当数据流中的数据经过网络传输或其他因素导致乱序时,Watermark可以帮助系统判断数据的先后顺序。只有在Watermark之前到达的数据才会被触发处理,Watermark之后到达的数据则会被暂存,直到所有可能的晚到数据都到达后再触发处理。
延迟数据处理:当数据流中的数据因为某种原因而受到延迟时,Watermark可以帮助系统判断是否需要等待更多的数据再触发处理。只有当Watermark到达一定的时间点时,系统才会触发处理,确保数据的完整性和正确性。
保证数据不丢失:通过设置适当的Watermark延迟阈值,可以在处理数据时容忍一定程度的迟到数据,从而减少数据丢失的可能性。系统可以等待一段时间以接收迟到的数据并确保处理完整性。
要保证数据不丢失,流数据处理系统需要在实现中进行适当的设计和配置:
设置合理的Watermark延迟时间:根据具体应用的需求和数据特性,设置合适的Watermark延迟时间,使得数据能够被充分处理,同时不会引入过大的延迟。
处理迟到数据:在触发数据处理的时候,需要综合考虑Watermark和事件时间,确保迟到数据能够被及时处理,而不会被丢弃。
数据持久化:对于需要对数据进行存储或者传递的情况,可以利用缓冲区或者持久化机制,确保数据在系统中的安全性和完整性。
总之,Watermark在流数据处理中起着重要的作用,通过定义时间边界,帮助系统判断数据的处理状态和处理时机,从而保证数据的准确性和完整性,降低数据丢失的风险。
在大部分分布式计算框架中,如Hadoop MapReduce,Spark等,Reduce阶段是必不可少的,因为Reduce阶段负责对Map阶段输出的中间结果进行合并和汇总。然而,有些情况下,我们可能希望去掉Reduce阶段来提高计算性能或简化计算逻辑。
有几种方法可以实现去掉Reduce阶段的需求:
使用Map-only任务:在一些特定的场景下,输入数据的处理过程并不需要合并或汇总的过程,即可通过配置为Map-only任务来去掉Reduce阶段。例如,可以设置Reduce任务数为0,或者将业务逻辑设计为只在Map阶段计算得出结果。
使用Combiner函数:Combiner函数是MapReduce框架中的一种优化技术,用于在Mapper节点上进行部分聚合操作。Combiner函数可以在Map阶段局部地对中间结果进行合并和汇总,减少Reducer节点的负载。如果能够通过适当的Combiner函数实现所需的结果,可以避免完全的Reduce阶段。
使用分布式键值对存储系统:在一些特定的场景下,可以考虑使用分布式键值对存储系统,如HBase或Redis等,将数据直接存储在这些系统中,避免经过Reduce阶段的处理。在需要使用数据的时候,可以直接从存储系统中读取,并进行相应的计算处理。
需要注意的是,在一些需要全局聚合或排序的情况下,去掉Reduce阶段可能会导致计算结果的不完整或不正确。因此,在考虑去掉Reduce阶段时,需要仔细分析业务需求和数据特点,确保不会影响计算的正确性。
总结起来,要实现去掉Reduce阶段,可以尝试使用Map-only任务、Combiner函数或分布式键值对存储系统等技术手段,但需要根据具体需求和数据特点进行合理选择和设计。
Flink作为一款分布式流数据处理框架,为了保证数据处理的准确性和完整性,提供了Exactly Once语义的支持。在Flink中,可以通过以下几个方面来保证Exactly Once语义:
分布式快照机制:Flink采用分布式快照机制记录任务全局状态,在进行容错恢复时可以从最近的一个快照点开始恢复。当任务执行过程中发生故障时,Flink会基于最近的快照点恢复任务状态,保证数据的准确性和完整性。
原子性输出:Flink的数据源和数据接收器(Sink)都支持原子性输出模式。在执行外部调用操作时,Flink将输出和成功确认的记录记录在“私有事务”中,保证确定的输出记录不会被重复写入。如果任务失败,记录或部分记录会自动回滚,避免重复处理数据。
精确一次状态更新:Flink的状态更新操作中采用了两个阶段提交协议(2PC),即预提交和提交。Flink在更新状态时,首先把状态写到内存中的状态缓存中,然后对状态进行预提交,确认状态一致性后再进行提交,从而保证每个阶段只会执行一次状态更新操作,保证精确一次状态更新。
通过以上三个方面的支持,Flink可以保证Exactly Once语义的实现。同时,Flink用户也需要在实际使用过程中,根据具体业务场景和数据特点,采用合适的并发度设置、调整窗口大小等方式,进行适当的优化和配置,以充分发挥Flink的性能和功能优势。
hadoop-env.sh是Hadoop集群的环境变量文件,用于定义一些Hadoop集群的全局配置变量。这些变量将会在Hadoop服务的运行过程中被访问,可以影响服务的配置和行为。
hadoop-env.sh文件定义的环境变量包括但不限于以下几个:
JAVA_HOME:指定Java的安装路径,Hadoop需要依赖Java运行。
HADOOP_OPTS:指定运行Hadoop程序时的JVM参数,如内存配置等。
HADOOP_LOG_DIR:指定Hadoop日志文件所在目录的路径。
HADOOP_PID_DIR:指定进程的PID文件存放目录,一般用于停止运行中的Hadoop服务。
HADOOP_USER_CLASSPATH_FIRST:如果设置为“true”,则Hadoop会优先使用用户配置的classpath,否则使用Hadoop默认的classpath设置。
hadoop-env.sh文件是在Hadoop服务启动时被执行的,并且是对Hadoop服务全局的配置,所以修改该文件需要谨慎操作,建议备份原文件以便恢复,并在修改前详细了解各个环境变量的含义以及对集群配置的影响。
总之,hadoop-env.sh文件是Hadoop集群环境变量的重要配置文件,通过修改该文件可以影响Hadoop服务的全局配置和行为,需要根据具体环境和需求进行适当的配置和修改。
实时的 TopN 计算是 Flink 常见的一个需求,Flink 提供了诸多处理数据流的算子,例如 window、reduce、aggregation 等,通过这些算子的组合可以实现实时的 TopN 计算。
下面是一个实现实时 TopN 的示例代码:
DataStream<Tuple2<String, Integer>> inputStream = ... // 输入流
int N = ... // 需要取 TopN 的 N 值
// 使用 keyed window 进行滑动计数,每个窗口会计算每个 key 的出现次数
DataStream<Tuple2<String, Integer>> keyedCounts = inputStream
.keyBy(0)
.timeWindow(Time.seconds(1), Time.seconds(1))
.sum(1);
// 使用全局窗口对 keyedCounts 进行计算 TopN 操作
DataStream<String> topNStream = keyedCounts
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new TopNFunction(N));
// TopNFunction 是一个用户自定义 AggregateFunction 实现,用于计算 TopN
public static class TopNFunction implements AggregateFunction<Tuple2<String, Integer>, PriorityQueue<Tuple2<String, Integer>>, String> {
private int N;
public TopNFunction(int n) {
this.N = n;
}
@Override
public PriorityQueue<Tuple2<String, Integer>> createAccumulator() {
return new PriorityQueue<>(N, (o1, o2) -> o1.f1 - o2.f1);
}
@Override
public PriorityQueue<Tuple2<String, Integer>> add(Tuple2<String, Integer> value, PriorityQueue<Tuple2<String, Integer>> accumulator) {
accumulator.offer(value);
if (accumulator.size() > N) {
accumulator.poll();
}
return accumulator;
}
@Override
public String getResult(PriorityQueue<Tuple2<String, Integer>> accumulator) {
List<Tuple2<String, Integer>> list = new ArrayList<>();
Tuple2<String, Integer> top = accumulator.poll();
while (top != null) {
list.add(top);
top = accumulator.poll();
}
Collections.reverse(list);
return list.toString();
}
@Override
public PriorityQueue<Tuple2<String, Integer>> merge(PriorityQueue<Tuple2<String, Integer>> acc1,
PriorityQueue<Tuple2<String, Integer>> acc2) {
PriorityQueue<Tuple2<String, Integer>> result = createAccumulator();
result.addAll(acc1);
result.addAll(acc2);
while (result.size() > N) {
result.poll();
}
return result;
}
}
以上代码中,首先使用 keyBy 把输入流转换成 key-value 格式,然后使用 timeWindow 算子对 key 进行滑动窗口计数,再将计数结果通过 trigger 算子触发求每个窗口的 topN。TopNFunction 实现了 Flink 的 AggregateFunction 接口,用于计算 topN。该函数中使用 PriorityQueue 存储数据,每次添加新数据时进行优先级排序,保持队列中存储的最小值是当前 TopN 组成的元素,当队列长度超过设定的 TopN 数量时,移除队列中优先级最低的元素。
值得注意的是,使用 GlobalWindows 算子对所有 key 进行实时的计算时,需要注意窗口的触发策略,可以使用 PurgingTrigger.of(CountTrigger.of(1)) 策略,每来一个元素就触发一次计算,这样可以保证实时性,但也会存在数据倾斜的情况。
总之,通过组合 Flink 提供的算子以及自定义函数,可以实现实时的 TopN 计算,适用于实时推荐、热点监控等场景。
MapReduce是一种经典的分布式计算模型,用于处理大规模数据集。下面是一个简单的例子来说明MapReduce是如何运行的:
假设有一个包含大量文本数据的文件集合,我们想要统计每个单词在这些文件中出现的次数。
Map阶段:
a. 输入:将每个文件分成多个片段,并将这些片段分发给不同的计算节点。
b. 操作:每个计算节点对其分配到的片段进行单词计数操作。
c. 输出:每个计算节点将计算结果以键值对的形式输出,其中键是单词,值为该单词在片段中出现的次数。
Shuffle(洗牌)阶段:
a. 输入:将所有计算节点的输出进行处理,合并相同键的值。
b. 操作:将相同键的值进行合并,并按键进行排序。
c. 输出:生成一组键值对,其中键是唯一的单词,值是该单词在所有片段中出现的次数。
Reduce阶段:
a. 输入:将排序后的键值对分发给不同的计算节点。
b. 操作:每个计算节点对其分配到的键值对进行合并操作,计算出每个单词在整个文件集合中的总次数。
c. 输出:每个计算节点将计算结果以键值对的形式输出,其中键是唯一的单词,值是该单词在文件集合中出现的总次数。
最后,将所有计算节点的输出合并,得到每个单词在整个文件集合中的总次数。
通过MapReduce的并行计算,我们可以高效地处理大规模数据集,并且可以扩展到具有上千个计算节点的分布式系统。
消费者要想知道消费到哪条消息,取决于采用的消息中间件的具体实现和消费者的编写方式。以下是一些常见的方法:
消费者手动维护消费偏移量:消费者可以记录消费到消息队列中哪些消息了,然后自行维护消费偏移量,确保只消费未消费的消息。这种方式需要消费者有足够的智能和状态管理能力。
基于Apache Kafka的消息中间件:使用Kafka时,消费者可以通过获取存储在Kafka中的偏移量来跟踪其进度,并确保只消费未消费的消息。Kafka提供了多个API来获取偏移量,如Offset API、Consumption API、Rest API等。
基于RabbitMQ的消息中间件:使用RabbitMQ时,消费者可以使用AMQP(高级消息队列协议)提供的acknowledgement机制来确认消费了哪些消息。消费者需要显式地调用此机制来通知RabbitMQ特定的消息已被消费。这种方式可确保消息只被消费一次。
总之,不同的消息中间件和消费者编写方式都有不同的跟踪消息进度的方式,开发者需要深入了解实现细节来正确地跟踪消费进度。
Hadoop是一个开源的分布式计算框架,用于处理大规模数据。Hadoop2和Hadoop3是Hadoop的两个主要版本,它们之间有以下区别:
YARN上的改进:Hadoop2引入了YARN(Yet Another Resource Negotiator)作为集群资源管理器,取代了Hadoop1中使用的MapReduce作业调度器。YARN的引入使得Hadoop集群可以支持除了MapReduce之外的其他分布式计算模型,如Spark、Flink等。Hadoop3进一步改进了YARN,通过引入容器的概念和资源类型的灵活性,提高了资源利用率和集群的可伸缩性。
高可用性的改进:Hadoop2引入了NameNode的高可用性选项,通过引入Active-Standby架构,确保在主节点故障时能够快速切换到备用节点。Hadoop3改进了高可用性的机制,引入了更高效的文件系统元数据存储机制,减少了主节点故障转移的影响。
子项目的改进和新增:Hadoop3带来了一些重要的改进和新增的子项目,例如:
性能改进:Hadoop3对底层的网络和文件系统进行了一些性能改进,增强了集群的整体性能。
总的来说,Hadoop3相对于Hadoop2增加了一些新的功能和改进,提高了集群的可靠性、可伸缩性和性能,使得Hadoop更适合处理现代大规模数据分析的需求。
区别 | Hadoop2 | Hadoop3 |
---|---|---|
YARN | 使用MapReduce作业调度器进行资源管理和作业调度 | 引入了YARN作为集群资源管理器,支持除MapReduce外的计算模型 |
高可用性 | 缺乏主节点故障切换机制 | 引入了主节点高可用性选项和更高效的故障转移机制 |
子项目 | 较少的子项目 | 新增了HDFS Erasure Coding、HDFS Router等子项目 |
性能改进 | 有限的性能改进 | 对底层的网络和文件系统进行了性能改进 |
如果在MapReduce作业中不需要Reduce阶段的输出,可以使用NullWritable
作为Reduce的输出键值对类型,并将Reduce阶段的输出键值对类型设置为NullWritable
。这样就不会有实际的输出内容产生。
以下是设置MapReduce作业不输出Reduce结果的步骤:
1. 在作业的配置中,将Reduce的输出键值对类型设置为NullWritable
:
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
2. 在Reduce函数中,不需要进行实际的输出操作:
public static class CustomReduce extends Reducer<KeyClass, ValueClass, NullWritable, NullWritable> {
public void reduce(KeyClass key, Iterable<ValueClass> values, Context context) throws IOException, InterruptedException {
// 不进行实际的输出操作
}
}
通过以上方式,可以在MapReduce作业中不输出Reduce阶段的结果。
下面是在Java语言和Python语言中实现统计出现次数最多的前N个数据的逻辑。
在Java中,可以使用HashMap来统计数据的出现次数,然后使用优先队列(PriorityQueue)来选出出现次数最多的前N个数据。
Java实现逻辑:
import java.util.*;
public class TopNFrequency {
public static List<Integer> topNFrequency(List<Integer> data, int N) {
Map<Integer, Integer> frequencyMap = new HashMap<>();
// 统计每个数据的出现次数
for (int num : data) {
frequencyMap.put(num, frequencyMap.getOrDefault(num, 0) + 1);
}
// 使用优先队列统计前N个出现次数最多的数据
PriorityQueue<Integer> queue = new PriorityQueue<>(Comparator.comparingInt(frequencyMap::get));
for (Integer num : frequencyMap.keySet()) {
queue.offer(num);
if (queue.size() > N) {
queue.poll();
}
}
// 构建结果列表
List<Integer> result = new ArrayList<>();
while (!queue.isEmpty()) {
result.add(queue.poll());
}
Collections.reverse(result); // 按出现次数最多到最少的顺序返回
return result;
}
public static void main(String[] args) {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 1, 2, 3, 2, 2, 4, 5, 5, 6, 6, 6, 6);
int N = 3;
List<Integer> topN = topNFrequency(data, N);
System.out.println(topN);
}
}
在Python中,可以使用Python内置的collections.Counter来统计数据的出现次数,然后使用排序和切片操作来选择出现次数最多的前N个数据。
Python实现逻辑:
from collections import Counter
def top_n_frequency(data, N):
counter = Counter(data)
sorted_items = sorted(counter.items(), key=lambda x: x[1], reverse=True)
topN = [item[0] for item in sorted_items[:N]]
return topN
data = [1, 2, 3, 4, 1, 2, 3, 2, 2, 4, 5, 5, 6, 6, 6, 6]
N = 3
topN = top_n_frequency(data, N)
print(topN)
无论是Java还是Python,上述代码都可以统计出现次数最多的前N个数据。注意,上述实现假设数据集能够全部放入内存中处理,如果数据集过大,可能需要考虑分布式计算等其他方案。