org.springframework.boot  	spring-boot-starter-parent  	2.6.3  	      	 		org.springframework.boot  		spring-boot-starter-web  	      	org.springframework.cloud  	spring-cloud-starter-sleuth  	3.1.1     想进一步减少架构复杂度,也可以自定义生成traceId,可以参考之前的文章实现
https://blog.csdn.net/qq_41633199/article/details/127482748?spm=1001.2014.3001.5502


bin\windows\zookeeper-server-start.bat config\zookeeper.properties  


## 启动kafka bin\windows\kafka-server-start.bat config\server.properties ## 创建主题 bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic app_log ## 查看主题 bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 # 生产消息 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic app_log # 消息消费 bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic app_log --group app_log --from-beginning  由于java异常日志输出堆栈的换行符会影响kafka engine日志解析(会当成多条日志分别解析导致报错),因此在FILE_FORMAT配置输出到日志文件的时候去除换行符。
                                   INFO                                              ${LOG_HOME}/info/bill_log.%d{yyyy-MM-dd_HH}.log                           168              true                                 ${FILE_FORMAT}                                                                  ${CONSOLE_FORMAT}                                                                                 https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.16.2-windows-x86_64.zip  https://www.elastic.co/guide/en/beats/filebeat/7.16/kafka-output.html  

设置传输kafka
确保output只有一个
filebeat -e -c filebeat.yml  访问应用查看kafka消费效果

172.27.x.x host.docker.internal  由于clickhouse容器需访问本机kafka,需要解决通信问题
docker run -d --network=bridge -p 8123:8123 -p 9000:9000 -p 9009:9009 --name clickhouse-svr --add-host="host.docker.internal:172.27.xx.x" clickhouse/clickhouse-server:24.4.3.25  set format_csv_delimiter = '|';  clickhouse-client --stream_like_engine_allow_direct_select 1  use log;  CREATE TABLE LOG_KAFKA (     time DateTime64(3, 'Asia/Shanghai'), 	level String, 	trace_id String, 	thread String, 	logger String, 	method String, 	msg String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host.docker.internal:9092', 	kafka_topic_list = 'app_log', 	kafka_group_name = 'app_log', 	kafka_num_consumers = 1, 	kafka_format = 'CSV', 	format_csv_delimiter  = '|';  create table APP_LOG( 	time DateTime64(3, 'Asia/Shanghai'), 	level String, 	trace_id String, 	thread String, 	logger String, 	method String, 	msg String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(time) ORDER BY time; ## 创建Metrialized View 抓取数据到日志表 CREATE MATERIALIZED VIEW vw_app_log TO APP_LOG AS SELECT time,level,trace_id,thread,logger,method,msg FROM LOG_KAFKA;  
windows docker安装
 官方文档地址
https://clickvisual.net/zh/clickvisual/02install/docker-installation.html
 拉取镜像
https://hub.docker.com/r/clickvisual/clickvisual/tags
下载配置文件到本地
https://github.com/clickvisual/clickvisual/tree/master/data/all-in-one/clickvisual/config
配置mysq数据库连接
*允许mysql用户通过ip连接
指定数据卷创建容器
docker run --name clickvisual -e EGO_CONFIG_PATH=/clickvisual/config/docker.toml -e EGO_LOG_WRITER=stderr -p 19001:19001 -v D:\download\clickvisual\config:/clickvisual/config -d clickvisual/clickvisual:latest  
clickhouse数据源连接格式clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60,因为我未给clickhouse设置权限认证,因此username:password@可以省略
创建好实例后回到日志页面,选择刚创建的实例右键接入已有日志库,配置日志数据表
访问前面写的空指针异常接口,再刷新ClickVisual页面
 通过链路id查询效果
                    上一篇:Java基础面试题
                
下一篇:人工智能幻觉:记忆能找到答案吗?