1、flume配置文件
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log
agent1.sources.r1.batchSize = 10
agent1.sources.r1.channels= c1
agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink
agent1.sinks.k1.channel = c1
agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092
agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent1.sinks.k1.request.required.acks = 1
agent1.sinks.k1.custom.topic.name = test22
agent1.channels.c1.type=memory
agent1.channels.c1.capacity=10000
agent1.channels.c1.transactionCapacity=500
agent1.channels.c1.keep-alive=30
2、KafkaSink类
package org.apache.flume.sink;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
public class KafkaSink extends AbstractSink implements Configurable{
private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class);
public static final String PARTITION_KEY_NAME = "custom.partition.key";
public static final String ENCODING_KEY_NAME = "custom.encoding";
public static final String DEFAULT_ENCODING = "UTF-8";
public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer";
private Properties parameters;
private Producer<String, String> producer;
@Override
public synchronized void start() {
super.start();
ProducerConfig config = new ProducerConfig(parameters);
this.producer = new Producer<String,String>(config);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
//start transaction
Channel ch = getChannel();
Transaction tx = ch.getTransaction();
tx.begin();
try{
Event event = ch.take();
String partitionKey = (String)parameters.get(PARTITION_KEY_NAME);
String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING);
String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");
String eventData = new String(event.getBody(),encoding);
KeyedMessage<String, String> data;
if(StringUtils.isEmpty(partitionKey)){
data = new KeyedMessage<String, String>(topic, eventData);
}else{
data = new KeyedMessage<String, String>(topic,partitionKey, eventData);
}
if(logger.isInfoEnabled()){
logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
}
producer.send(data);
tx.commit();
status = Status.READY;
}catch(Throwable t){
tx.rollback();
status = Status.BACKOFF;
if(t instanceof Error){
throw (Error)t;
}
}finally{
tx.close();
}
return status;
}
@Override
public void configure(Context context) {
ImmutableMap<String, String> props = context.getParameters();
parameters = new Properties();
for(String key : props.keySet()){
String value = props.get(key);
parameters.put(key, value);
}
}
@Override
public synchronized void stop() {
producer.close();
}
}
3、把相关的kafka及scala包导入到flume的lib中。
分享到:
相关推荐
flume-kafka此插件用于将flume及kafka集成,其中flume支持的版本为flume-ng 1.3.1及以上, kafka为2.10_0.8.2.0及以上
|____kafka第01天-09.flume与kafka集成.avi |____kafka第01天-08.通过java API编程实现kafka消息消费者.avi |____kafka第01天-07.通过java API编程实现kafka消息生产者.avi |____kafka第01天-06.kafka手动分区再平衡...
flume kafka storm集成源代码和文档介绍
目标是将 Flume 与 Kafka 集成,以便基于拉式的处理系统(如可以处理来自各种 Flume 源(如 Syslog)的数据。 这现在是官方 Flume 发行版(从 v1.6 开始)的一部分,并有重大改进。 更新 2014 年 8 月 23 日 - 此...
16:Flume+HBase+Kafka集成开发
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
IT十八掌第三期配套笔记! 1、kafka消息系统的介绍 2、producer有分区类 3、kafka支持的副本模式 4、kafka消费者偏移量考察 5、kafka自定义消费者 6、kafka自定义生产者 ...8、flume集成kafka的几种方式
01_尚硅谷_Kafka_课程介绍.avi 02_尚硅谷_Kafka_...尚硅谷_Kafka_低级API之测试.avi 21_尚硅谷_Kafka_扩展.avi 22_尚硅谷_Kafka_拦截器.avi 23__尚硅谷_Kafka_KafkaStream.avi 24__尚硅谷_Kafka_与Flume对比及集成.avi
后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...
Apache Hadoop生态圈的顶级项目之一,解决了传统Lamda架构处理Hadoop上快速变化数据的存储和处理技术过于复杂的问题,同时Kudu能够与Hadoop生态的其他组件比如Impala、Spark、Flume和Kafka等组件集成,大大降低了对...
Apache Hadoop生态圈的顶级项目之一,解决了传统Lamda架构处理Hadoop上快速变化数据的存储和处理技术过于复杂的问题,同时Kudu能够与Hadoop生态的其他组件比如Impala、Spark、Flume和Kafka等组件集成,大大降低了对...
而流处理则是直接对运动中的数据的处理,在接收数据时直接计算数据。 大多数数据都是连续的流:传感器事件,网站...+ Spark Streaming 可以从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,也支持自定义数据源。
Apache原生框架介绍中涉及到的技术框架包括Flume、Kafka、Sqoop、MySql、HDFS、Hive、Tez、Spark、Presto、Druid等,CDH版本框架讲解包括CM的安装部署、Hadoop、Zookeeper、Hive、Flume、Kafka、Oozie、Impala、HUE...
CDH版本框架讲解包括CM的安装部署、Hadoop、Zookeeper、Hive、Flume、Kafka、Oozie、Impala、HUE、Kudu、Spark的安装配置,透彻了解不同版本框架的区别联系,将大数据全生态系统前沿技术一网打尽。在过程中对大数据...
目录 1.1_大数据时代 1.2_大数据的应用领域-大数据解决方案 2.1_HDFS概述及应用场景-HDFS系统架构 2.2_关键特性介绍 3.1_MapReduce和Yarn基本介绍-MapReduce和Yarn功能...11.1_Kafka简介-Kafka架构与功能 .............
11.2_Kafka架构与功能-Kafka关键流程 第十二章 ZooKeeper集群分布式协调服务 12.1_ZooKeeper简介-与组件的关系 第十三章 FusionInsight HD 解决方案介绍 13.1_FusionInsight概述-FusionInsight特性介绍 13.2_...
本资源为大数据基础到中...159_kafka与flume集成-source集成- _, G+ K) y% I4 D" q9 \ 160_kafka与flume集成-sink集成4 o6 W; v5 a; p9 s. X% I7 @ 161_kafka与flume集成-channel集成/ x' w3 g3 z& d: w 162_kafka简介!
flume+springboot+kafka+sparkStream+mysql集成的代码,对应系列步骤:https://blog.csdn.net/mojir/article/details/95667896
目录网盘文件永久链接 1.1 大数据的定义及其应用领域 ...8.2 Flink的底层原理和集成情况 9.1 Loader数据转换 10.1 Flume的关键流程和特性 10.2 Flume操作实例 11.1 Kafka分布式消息订阅系统 .........
8.2.1 Flink的底层原理和集成情况 9.1.1 Loader数据转换 10.1.1 Flume的关键流程和特性 10.2.1 Flume操作实例 11.1.1 Kafka分布式消息订阅系统 12.1.1 ZooKeeper集群分布式协调服务 13.1.1 FusionInsight HD ...