妈耶,一个简简单单的功能,废了我大半天功夫,反思了一下原因,1是自己之前学的东西记不清了,各种配置搞的很晕,看了半天,2是网上的教程实在不堪入目,跟着走让人云里雾里,费半天功夫出不来结果。
仅此,详细告诉大家流程及结果演示。
一、安装kafka 和 flume(mac的,如果是其他的,请从网上自己找一下)
MAC安装Kafka 安装kafka时会默认安装zookeeper
MAC安装Flume
二、启动zookeeper
nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
三、启动kafka,创建kafka topic
nohup kafka-server-start /usr/local/etc/kafka/server.properties &
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
四、进入flume目录,创建job文件夹,然后创建flume的配置文件
touch job
cd job
vim flume-to-kafka.conf
然后写入配置:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=localhost:9092
a1.sinks.k1.kafka.topic=demo
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.custom.encoding=UTF-8
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、选做(将上面配置的avro先改为netcat,通过nc测试flume是否连接到了kafka)
修改完上面的,然后启动flume:
bin/flume-ng agent -n a1 -c libexec/conf -f job/flume-to-kafka.conf
然后新建一个窗口,查看flume是否连接到了kafka:
如果看到上面,通过nc发送的数据,在kafka能接受到,说明flume连接kafka成功!
六、 如果做了五,将netcat改回avro,如果没做五,请忽略
启动flume:
bin/flume-ng agent -n a1 -c libexec/conf -f job/flume-to-kafka.conf
七、创建springboot项目
1、pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bupt</groupId>
<artifactId>realcaldemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>realcaldemo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!--排除这个slf4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!--排除这个slf4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、新建log4j.properties
log4j.properties文件如下:
log4j.rootLogger=info,console,flume
# console appender config
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
# flume appender config
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=localhost
log4j.appender.flume.Port=44444
log4j.appender.flume.UnsafeMode=false
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
3、随便创建一个类测试一下
package com.bupt.realcaldemo.test;
import org.apache.log4j.Logger;
public class Testlog {
protected static final Logger LOG=Logger.getLogger(Testlog.class);
public static void main(String[] args) {
// LOG.debug("这是一条debug级别的日志!");
LOG.info("这是一条info级别的日志!");
// LOG.error("这是一条error级别的日志!");
// LOG.fatal("这是一条fatal级别的日志!");
}
}
共有条评论 网友评论