大数据技术之数据采集与预处理
[ 发表时间:2020-12-10 18:30:31 信息来源:九剑网络 ]
来源: https://blog.csdn.net/Coder_Boy_/article/details/109477810
大数据采集架构
概述
如今,社会中各个机构、部门、公司、团体等正在实时不断地产生大量的信息,这些信息需要以简单的方式进行处理,同时又要十分准确且能迅速满足各种类型的数据(信息)需求者。这给我们带来了许多挑战,第一个挑战就是在大量的数据中收集需要的数据,下面介绍常用的大数据采集工具。
常用大数据采集工具
数据采集最传统的方式是企业自己的生产系统产生的数据,除上述生产系统中的数据外,企业的信息系统还充斥着大量的用户行为数据、日志式的活动数据、事件信息等,越来越多的企业通过架设日志采集系统来保存这些数据,希望通过这些数据获取其商业或社会价值。
Apache Kafka数据采集
6、使用Java来编写Kafka的实例
首先,编写KafkaProducer.properties文件:
zk.connect = localhost:2181
broker.list = localhost:9092
serializer.class = kafka.serializer.StringEncoder
request.required.acks = 1
•1
•2
•3
•4
下面的代码是使用Java编写了一个Kafka消息发布者:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class MyKafkaProducer {
private Producer
private final String topic;
public MyKafkaProducer(String topic) throws Exception {
InputStream in = Properties.class.
getResourceAsStream("KafkaProducer.properties");
Properties props = new Properties();
props.load(in);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer
}
public void sendMessage(String msg){
KeyedMessage
new KeyedMessage
producer.send(data);
producer.close();
}
public static void main(String[] args) throws Exception{
MyKafkaProducer producer = new MyKafkaProducer("HelloTopic");
String msg = "Hello Kafka!";
producer. sendMessage(msg);
}
}
下面创建Comsumer,首先编写KafkaProperties文件:
zk.connect = localhost:2181
group.id = testgroup
zookeeper.session.timeout.ms = 500
zookeeper.sync.time.ms = 250
auto.commit.interval.ms = 1000
•1
•2
•3
•4
•5
上述参数配置,十分容易理解,具体的详细说明,可以参考Kafka的官方文档。下面的代码是使用Java编写了一个Kafka的Comsumer。
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.Consumer;
public class MyKafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;
public MyKafkaConsumer(String topic) throws Exception{
InputStream in = Properties.class.
getResourceAsStream("KafkaProducer.properties");
Properties props = new Properties();
props.load(in);
ConsumerConfig config = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(config);
this.topic = topic;
}
public void consumeMessage() {
Map
topicMap.put(topic, new Integer(1));
Map<String, List<KafkaStream
consumer.createMessageStreams(topicMap);
List<KafkaStream
consumerStreamsMap.get(topic);
for (final KafkaStream
ConsumerIterator
stream.iterator();
while (consumerIte.hasNext())
System.out.println("message :: "
+ new String(consumerIte.next().message()));
}
if (consumer != null)
consumer.shutdown();
}
public static void main(String[] args) throws Exception{
String groupId = "testgroup";
String topic = "HelloTopic";
MyKafkaConsumer consumer = new MyKafkaConsumer(topic);
consumer.consumeMessage();
}
}
数据预处理原理
通过数据预处理工作,可以使残缺的数据完整,并将错误的数据纠正、多余的数据去除,进而将所需的数据挑选出来,并且进行数据集成。数据预处理的常见方法有数据清洗、数据集成与数据变换。
数据清洗
数据集成
数据变换
数据仓库与ETL工具
数据仓库与ETL工具
数据仓库,是在企业管理和决策中面向主题的、集成的、随时间变化的、非易失性数据的集合。
数据仓库中的数据来自于多种业务数据源,这些数据源可能处于不同硬件平台上,使用不同的操作系统,数据模型也相差很远。如何获取并向数据仓库加载这些数据量大、种类多的数据,已成为建立数据仓库所面临的一个关键问题。
常用ETL工具
案例:Kettle数据迁移