`

flume与kafka集成

阅读更多
1、flume配置文件

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log
agent1.sources.r1.batchSize = 10
agent1.sources.r1.channels= c1

agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink
agent1.sinks.k1.channel = c1
agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092
agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent1.sinks.k1.request.required.acks = 1
agent1.sinks.k1.custom.topic.name = test22

agent1.channels.c1.type=memory
agent1.channels.c1.capacity=10000
agent1.channels.c1.transactionCapacity=500
agent1.channels.c1.keep-alive=30


2、KafkaSink类

package org.apache.flume.sink;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

public class KafkaSink extends AbstractSink implements Configurable{
	
	private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class);
	
	public static final String PARTITION_KEY_NAME = "custom.partition.key";
    public static final String ENCODING_KEY_NAME = "custom.encoding";
    public static final String DEFAULT_ENCODING = "UTF-8";
    public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
    public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer";

	private Properties parameters;
	private Producer<String, String> producer;
	
	@Override
	public synchronized void start() {
		super.start();
		ProducerConfig config = new ProducerConfig(parameters);
		this.producer = new Producer<String,String>(config);
	}
	
	@Override
	public Status process() throws EventDeliveryException {
		
		Status status = null;
		
		//start transaction
		Channel ch = getChannel();
		Transaction tx = ch.getTransaction();
		tx.begin();
		
		try{
			Event event = ch.take();
			
			String partitionKey = (String)parameters.get(PARTITION_KEY_NAME);
			String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING);
			String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");
			
			String eventData = new String(event.getBody(),encoding);
			
			KeyedMessage<String, String> data;
			
			if(StringUtils.isEmpty(partitionKey)){
				data = new KeyedMessage<String, String>(topic, eventData);
			}else{
				data = new KeyedMessage<String, String>(topic,partitionKey, eventData);
			}
			
			if(logger.isInfoEnabled()){
				logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
			}
			
			producer.send(data);
			tx.commit();
			status = Status.READY;
		}catch(Throwable t){
			tx.rollback();
			status = Status.BACKOFF;
			if(t instanceof Error){
				throw (Error)t;
			}
		}finally{
			tx.close();
		}
		
		return status;
	}

	@Override
	public void configure(Context context) {
		ImmutableMap<String, String> props = context.getParameters();
		
		parameters = new Properties();
		for(String key : props.keySet()){
			String value = props.get(key);
			parameters.put(key, value);
		}
	}

	
	@Override
	public synchronized void stop() {
		producer.close();
	}

}



3、把相关的kafka及scala包导入到flume的lib中。
分享到:
评论

相关推荐

    flume-kafka:此插件用于将flume及kafka集成,其中flume支持的版本为flume-ng 1.3.1及以上, kafka为2.10_0.8.2.0及以上

    flume-kafka此插件用于将flume及kafka集成,其中flume支持的版本为flume-ng 1.3.1及以上, kafka为2.10_0.8.2.0及以上

    老男孩大数据kafka视频教程

    |____kafka第01天-09.flume与kafka集成.avi |____kafka第01天-08.通过java API编程实现kafka消息消费者.avi |____kafka第01天-07.通过java API编程实现kafka消息生产者.avi |____kafka第01天-06.kafka手动分区再平衡...

    flume-kafka-storm源程序

    flume kafka storm集成源代码和文档介绍

    flume-ng-kafka-sink:将数据发布到 Apache Kafka 的 Apache Flume Sink 实现

    目标是将 Flume 与 Kafka 集成,以便基于拉式的处理系统(如可以处理来自各种 Flume 源(如 Syslog)的数据。 这现在是官方 Flume 发行版(从 v1.6 开始)的一部分,并有重大改进。 更新 2014 年 8 月 23 日 - 此...

    16:Flume+HBase+Kafka集成开发.rar

    16:Flume+HBase+Kafka集成开发

    springboot_log4j2_flume

    Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...

    IT十八掌_kafka阶段学习笔记(课堂笔记与原理图解)

    IT十八掌第三期配套笔记! 1、kafka消息系统的介绍 2、producer有分区类 3、kafka支持的副本模式 4、kafka消费者偏移量考察 5、kafka自定义消费者 6、kafka自定义生产者 ...8、flume集成kafka的几种方式

    尚硅谷大数据视频_Kafka视频教程

    01_尚硅谷_Kafka_课程介绍.avi 02_尚硅谷_Kafka_...尚硅谷_Kafka_低级API之测试.avi 21_尚硅谷_Kafka_扩展.avi 22_尚硅谷_Kafka_拦截器.avi 23__尚硅谷_Kafka_KafkaStream.avi 24__尚硅谷_Kafka_与Flume对比及集成.avi

    kafka-effective:更有效地使用apache kafka消费者和生产者

    后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...

    利用Impala+Kudu构建准实时分析应用

    Apache Hadoop生态圈的顶级项目之一,解决了传统Lamda架构处理Hadoop上快速变化数据的存储和处理技术过于复杂的问题,同时Kudu能够与Hadoop生态的其他组件比如Impala、Spark、Flume和Kafka等组件集成,大大降低了对...

    构建近实时分析系统

    Apache Hadoop生态圈的顶级项目之一,解决了传统Lamda架构处理Hadoop上快速变化数据的存储和处理技术过于复杂的问题,同时Kudu能够与Hadoop生态的其他组件比如Impala、Spark、Flume和Kafka等组件集成,大大降低了对...

    Spark Streaming 流式处理项目代码.rar

    而流处理则是直接对运动中的数据的处理,在接收数据时直接计算数据。 大多数数据都是连续的流:传感器事件,网站...+ Spark Streaming 可以从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,也支持自定义数据源。

    全新大数据企业电商数据仓库项目实战教程

    Apache原生框架介绍中涉及到的技术框架包括Flume、Kafka、Sqoop、MySql、HDFS、Hive、Tez、Spark、Presto、Druid等,CDH版本框架讲解包括CM的安装部署、Hadoop、Zookeeper、Hive、Flume、Kafka、Oozie、Impala、HUE...

    大数据—电商数仓项目

    CDH版本框架讲解包括CM的安装部署、Hadoop、Zookeeper、Hive、Flume、Kafka、Oozie、Impala、HUE、Kudu、Spark的安装配置,透彻了解不同版本框架的区别联系,将大数据全生态系统前沿技术一网打尽。在过程中对大数据...

    大数据培训视频.zip

    目录 1.1_大数据时代 1.2_大数据的应用领域-大数据解决方案 2.1_HDFS概述及应用场景-HDFS系统架构 2.2_关键特性介绍 3.1_MapReduce和Yarn基本介绍-MapReduce和Yarn功能...11.1_Kafka简介-Kafka架构与功能 .............

    华为HCIA-Big Data V2.0 LVC公开课培训.rar

    11.2_Kafka架构与功能-Kafka关键流程 第十二章 ZooKeeper集群分布式协调服务 12.1_ZooKeeper简介-与组件的关系 第十三章 FusionInsight HD 解决方案介绍 13.1_FusionInsight概述-FusionInsight特性介绍 13.2_...

    2017最新大数据架构师精英课程

    本资源为大数据基础到中...159_kafka与flume集成-source集成- _, G+ K) y% I4 D" q9 \ 160_kafka与flume集成-sink集成4 o6 W; v5 a; p9 s. X% I7 @ 161_kafka与flume集成-channel集成/ x' w3 g3 z& d: w 162_kafka简介!

    程序代码.rar

    flume+springboot+kafka+sparkStream+mysql集成的代码,对应系列步骤:https://blog.csdn.net/mojir/article/details/95667896

    HCIA-Big Data V2.0视频.zip

    目录网盘文件永久链接 1.1 大数据的定义及其应用领域 ...8.2 Flink的底层原理和集成情况 9.1 Loader数据转换 10.1 Flume的关键流程和特性 10.2 Flume操作实例 11.1 Kafka分布式消息订阅系统 .........

    华为HCIA-Big Data V2.0 LVC公开课培训视频教程【共25集】.rar

    8.2.1 Flink的底层原理和集成情况 9.1.1 Loader数据转换 10.1.1 Flume的关键流程和特性 10.2.1 Flume操作实例 11.1.1 Kafka分布式消息订阅系统 12.1.1 ZooKeeper集群分布式协调服务 13.1.1 FusionInsight HD ...

Global site tag (gtag.js) - Google Analytics