kafka大数据采集技术实验(未完待续)

Kafka环境搭建

  1. 下载地址:https://link.zhihu.com/?target=https%3A//kafka.apache.org/downloads
  2. 解压
  3. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

需要注意的是 : " c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录和 " / c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录是不同的 . 前者指当前目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件, 后者代表根目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件。 \color{red}需要注意的是:\\ "config/zookeeper.properties"目录和 "/config/zookeeper.properties"目录是不同的.\\ 前者指当前目录中config目录下的zookeeper.properties文件,\\ 后者代表根目录中config目录下的zookeeper.properties文件。 需要注意的是:"config/zookeeper.properties"目录和"/config/zookeeper.properties"目录是不同的.前者指当前目录中config目录下的zookeeper.properties文件,后者代表根目录中config目录下的zookeeper.properties文件。

若启动不成功,需要将zookeeper.properties中的admin.EnableServer=false修改为admin.EnableServer=true
或者关闭zookeeper并重新启动:

bin/zookeeper-server-stop.sh 
  1. 启动kafka
bin/kafka-server-start.sh config/server.properties
  1. 创建topic
kafka-topics.sh --create --zookeeper cluster1:9092,cluster2: 9092,cluster3: 9092--replication-factor 3 --partitions 1 --topic ljg

若发生错误:”zookeeper is not a recognized option”则将参数换成“—BOOTSTRAP-SERVER”,即:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ljg

上述两者的区别是,–zookeeper 和cluster都是老版本的命令参数,新版本可能不再支持。

  1. 创建生产者
kafka-console-producer.sh --broker-list cluster1:9092 --topic ljg

或者:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ljg

上述两者的区别是,–broker-list 和cluster都是老版本的命令参数,新版本可能不再支持。

  1. 创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ljg

此时生产者即可进入等待输入,并将消息发送给消费者。

在这里插入图片描述

2181端口用于管理Kafka集群的元数据信息,包括Kafka的配置信息、分区信息、消费者信息等。而9092端口是Kafka Broker的默认端口,用于接收和处理生产者和消费者的消息,以及进行数据的存储和传输。

参考链接:https://www.cnblogs.com/anquing/p/14523046.html

maven下载安装、设置

  1. 下载解压
  2. 设置工作目录
  3. 设置镜像
  4. 编译java项目

mave命令:

mvn clean:清理
mvn compile:编译主程序
mvn test-compile:编译测试程序
mvn test:执行测试
mvn package:打包
mvn install:安装

maven项目目录结构:
在这里插入图片描述

Hello.java内容:

package com.maven.test;


public class hello {
  public String sayHello(String name){
    return "Hello "+name+"!";
  }
}

注意pom.xml中的name 和artifactId字段的区别。
pom.xml例子:

<?xml version="1.0" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.maven.test</groupId>
    <artifactId>hello</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>hello</name>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

数据库安装和操作

安装mabiadb数据库

yum install mariadb-server 
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation

创建数据表:

mysql -uroot -p
create database kafkaTestDB;
use kafkaTestDB;
create table kafkaTestTable(tickcount varchar(64), value  varchar(64),time varchar(64));

java连接和操作数据库:

package Main;

import java.sql.*;

public class JDBC {
    public static void main(String[] args) throws SQLException, ClassNotFoundException {
//        1.加载驱动
        Class.forName("com.mysql.cj.jdbc.Driver");
//        2.用户信息和url
        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true";
        String username="root";
        String password="root";
//        3.连接成功,数据库对象 Connection
        Connection connection = DriverManager.getConnection(url,username,password);
//        4.执行SQL对象Statement,执行SQL的对象
        Statement statement = connection.createStatement();
//        5.执行SQL的对象去执行SQL,返回结果集
        String sql = "SELECT *FROM studentinfo;";
        ResultSet resultSet = statement.executeQuery(sql);
        while(resultSet.next()){
            System.out.println("SNo="+resultSet.getString("SNo"));
            System.out.println("SName="+resultSet.getString("SName"));
            System.out.println("Birth="+resultSet.getString("Birth"));
            System.out.println("SPNo="+resultSet.getString("SPNo"));
            System.out.println("Major="+resultSet.getString("Major"));
            System.out.println("Grade="+resultSet.getString("Grade"));
            System.out.println("SInstructor="+resultSet.getString("SInstructor"));
            System.out.println("SPwd="+resultSet.getString("SPwd"));
        }
//        6.释放连接
        resultSet.close();
        statement.close();
        connection.close();
    }
}


JAVA代码(maven)构建生产者消费者

工程目录:

在这里插入图片描述

pom.xml:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.bjtu.kafkaTest</groupId>
    <artifactId>kafkaTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
 
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.1</version>
        </dependency>
 
        <dependency>
            <groupId> org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>0.8.1</version>
 
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.26</version>
</dependency>

<dependency>
   <groupId>joda-time</groupId>
   <artifactId>joda-time</artifactId>
   <version>2.9.8</version>
</dependency>

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version>
</dependency>

    </dependencies>
 
</project>


java源码:

consumer:

package com.bjtu.kafkaTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Arrays;

import java.sql.*;
import java.util.Properties;
import org.json.*;
 




public class ConsumerDemo {

	


    public static void mysqlAccess(String tick,String value,String t)  {

	try{

		Class.forName("com.mysql.cj.jdbc.Driver");

		String url = "jdbc:mysql://localhost:3306/";

		String username="root";
		String password="1234";

		Connection connection = DriverManager.getConnection(url,username,password);

		Statement statement = connection.createStatement();

		String sql = "use kafkaTestDB;";
		ResultSet resultSet = statement.executeQuery(sql);
		//sql = "SELECT *FROM kafkaTestTable;";
		//resultSet = statement.executeQuery(sql);
		//while(resultSet.next()){
		//    System.out.println("tickcount:"+resultSet.getString("tickcount"));
		//}
		sql = "insert into kafkaTestTable values(" + tick +"," + value + "," + t + ")";
		resultSet = statement.executeQuery(sql);
		System.out.println(sql);
	
		resultSet.close();
		statement.close();
		connection.close();
	}
	catch(Exception e){
		System.out.println(e.toString());
	}

    }






    public static void main(String[] args){

	System.out.println("consumer start\r\n");
	Statement statement = null;
	Connection connection = null;
	ResultSet resultSet = null;
	try{

		Class.forName("com.mysql.cj.jdbc.Driver");

		String url = "jdbc:mysql://localhost:3306/";

		String username="root";
		String password="1234";

		connection = DriverManager.getConnection(url,username,password);

		statement = connection.createStatement();

		String sql = "use kafkaTestDB;";
		resultSet = statement.executeQuery(sql);
		//sql = "SELECT *FROM kafkaTestTable;";
		//resultSet = statement.executeQuery(sql);
		//while(resultSet.next()){
		//    System.out.println("tickcount:"+resultSet.getString("tickcount"));
		//}

	}
	catch(Exception e){
		System.out.println(e.toString());
		return;
	}

        Properties properties = new Properties();
      	properties.put("bootstrap.servers", "0.0.0.0:9092");
    
        properties.put("group.id", "zabbix_perf");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        /**
         * earliest
         *   当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         *   latest
         *   当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         *   none
         *   topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         *
         */
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        /**
         * 反序列化
         * 把kafka集群二进制消息反序列化指定类型。
         */
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("ljg"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超时时间

	
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
		

		JSONObject jo = new JSONObject(record.value());
		try{
			String tick = jo.getString("tickcount");
			String value = jo.getString("value");
			String t = jo.getString("time");
			//mysqlAccess(tick,value,t);
			//System.out.println("tick:"+tick + ",value:"+value + ",time:"+t);


			String sql = "insert into kafkaTestTable values(\"" + 
	tick + "\",\"" + value + "\",\"" + t + "\")";
			System.out.println(sql);
			int result = statement.executeUpdate(sql);
			
	

		}catch(Exception e){
			System.out.println(e.toString());
			break;
		}

            }
        }
 

	//try{
	//resultSet.close();
	//statement.close();
	//connection.close();
	//}catch(Exception e){
	//	System.out.println(e.toString());
	//}
    }

}


producer:

package com.bjtu.kafkaTest;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
import java.util.Random;
import java.util.Date;

import java.time.LocalDate;
import org.joda.time.DateTime;

import java.text.SimpleDateFormat;



public class ProducerDemo {

    public static void main(String[] args){

	System.out.println("producer start\r\n");
 
        Properties properties = new Properties();
        /**
         *bootstrap.server用于建立到Kafka集群的初始连接的主机/端口对的列表,如果有两台以上的机器,逗号分隔
         */
        properties.put("bootstrap.servers", "0.0.0.0:9092");
        /**
         * acks有三种状态
         * acks=0 不等待服务器确认直接发送消息,无法保证服务器收到消息数据
         * acks=1 把消息记录写到本地,但不会保证所有的消息数据被确认记录的情况下进行释放
         * acks=all 确认所有的消息数据被同步副本确认,这样保证了记录不会丢失
         *
         */
        properties.put("acks", "all");
        /**
         * 设置成大于0将导致客户端重新发送任何发送失败的记录
         *
         */
        properties.put("retries", 0);
        /**
         *16384字节是默认设置的批处理的缓冲区
         */
        properties.put("batch.size", 16384);
 
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        /**
         * 序列化类型。
         * kafka是以键值对的形式发送到kafka集群的,key是可选的,value可以是任意类型,Message再被发送到kafka之前,Producer需要
         * 把不同类型的消息转化成二进制类型。
         */
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);

		Random r = new Random();

            for (int i = 0; i < 1000; i++) {

		int rv = r.nextInt(0x10000000);

		long timestamp = System.currentTimeMillis();
		Date date = new Date(timestamp + rv);
		SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
		String formattedDate = sdf.format(date);
		//System.out.println("格式化后的日期:" + formattedDate);


                String msg = "{\"tickcount\":\"" + formattedDate + "\",\"value\":\"" +formattedDate +"\",\"time\":\"" 			+ formattedDate + "\"}" ;

                producer.send(new ProducerRecord<String, String>("ljg", msg));

                System.out.println("Sent:" + msg);

		//Thread.sleep(1);
            }
        } catch (Exception e) {
            e.printStackTrace();
 
        } finally {
            producer.close();
        }
 
	System.out.println("producer end\r\n");
    }
}

安装依赖包:

mvn idea:module

编译运行:

在这里插入图片描述

执行:

mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ConsumerDemo"


mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ProducerDemo"

参考链接:
https://www.cnblogs.com/qqran/p/14772713.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/572578.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

维态思(上海)环保科技有限公司 | 2024全国水科技大会暨技术装备成果展览会

嘉宾简介 胡建龙 维态思&#xff08;上海&#xff09;环保科技有限公司 总经理 报告题目&#xff1a;微生态滤床 植物工厂——小城镇生活污水生态净化及零排放案例分享 国家注册设备工程师&#xff08;给排水&#xff09;、上海市&#xff08;合作交流&#xff09;五四青年…

BUUCTF---misc---[ACTF新生赛2020]outguess

1、下载附件&#xff0c;解压之后得到下面信息 2、查看图片属性&#xff0c;发现有个核心价值观编码&#xff1b;解码为abc 3、flag.txt提示 4、结合题目&#xff0c;这是一个outguess隐写 5、用kali先下载安装隐写库 6、使用命令-k(密钥)&#xff1b;-r(将图片里面的隐写信息…

InstantMesh:利用稀疏视图大规模重建模型从单张图像高效生成3D网格

作者&#xff1a;Jiale Xu&#xff0c;Weihao Cheng&#xff0c;Yiming Gao等 编译&#xff1a;东岸因为一点人工一点智能 InstantMesh&#xff1a;利用稀疏视图大规模重建模型从单张图像高效生成3D网格在这项工作中&#xff0c;我们提出了InstantMesh&#xff0c;一个开源的…

免费在英伟达官网使用多个开源AI大模型

英伟达官网能体验到多个聊天AI和图片生成AI&#xff0c;不废话直接上链接 AI开源大模型&#xff08;https://build.nvidia.com/explore/discover?api-keytrue&#xff09; 开源的AI大模型有meta的llama3-8b和llama3-70b、snowflake的arctic、microsoft的phi-3-mini、mistral…

【Linux系统编程】第九弹---权限管理操作(下)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】 目录 1、目录权限 2、粘滞位 总结 1、目录权限 首先提出一个问题&#xff0c;删除一个文件需要什么权限呢&#xff1f;&#xff1f…

虚拟机软件哪个好用 虚拟机软件哪个可以玩暗区突围 虚拟机软件排名 PD19虚拟机 Mac类虚拟机运行Windows程序 CrossOver支持的热门游戏

随着跨系统互联的需求不断增长&#xff0c;越来越多的用户会选择在电脑系统中安装虚拟机软件&#xff0c;进而更加便捷地访问和操作其他系统。一款好用的虚拟机软件能够提高系统互联的效率&#xff0c;进而实现了资源共享、测试环境搭建等多种用途。而在众多的虚拟机软件当中&a…

张驰咨询:降本增效企业突破市场重围的制胜法宝

企业在快速发展的过程中&#xff0c;降本增效是永恒不变的主题。毕竟&#xff0c;在竞争激烈的市场环境中&#xff0c;只有不断提高效率和降低成本&#xff0c;才能在竞争中立于不败之地。那么&#xff0c;为什么企业需要降本增效呢&#xff1f; 首先&#xff0c;降本增效是企业…

vue+springboot的登录图片验证码(前端对接报错)

tip:这个只是一个效果实际要运用&#xff0c;还是需要改改滴&#xff01; 后台Java自带的 本来我是打算用第三方库的&#xff0c;没有整出来&#xff0c;就跟沈某人说不会来着&#xff0c;他说最好用Java自带的&#xff0c; 不然换个系统第三方的就不能用了&#xff0c;大概…

不可以论文查重,也包含了查AI率吗?

临近毕业&#xff0c;完成一篇符合学术规范的毕业论文是一项繁琐又具挑战性的任务。撰写完论文后&#xff0c;反复的查重降重已让人心身疲累。今年&#xff0c;学校又提出了新要求&#xff0c;论文还需要通过AIGC检测系统&#xff08;www.checkaigc.com&#xff09;才行&#x…

Vue2学习笔记(尚硅谷天禹老师)

目录 一、入门案例 二、模板语法 三、数据绑定 四、el和data的两种写法 五、MVVM模型 六、Object.defineproperty方法 七、Vue中响应式原理 八、数据代理 九、methods配置项 十、Vue中的事件处理 十一、Vue中的键盘事件 十二、计算属性 十三、监视属性watch 十四、绑定Class样式…

【echarts】数据起点不从X轴的原点开始【不从0开始】

echarts折线图x轴不从0开始怎么办&#xff1f; 或者说为什么有些图是这样的 有些却是这样的 原因出在这里&#xff1a; boundaryGap: false 默认是true&#xff0c;是指坐标轴两边留白。改为false&#xff1a;不留白即从原点开始。 看一下官方的说明

中小型企业网络实战topo

1、设备命名&#xff0c;务必按照规范进行命名规划&#xff1b; 2、子网划分&#xff0c;申请到了公网地址段&#xff0c;201.1.1.0/24&#xff0c;根据公司的实际情况&#xff0c;合理规划拓扑需要的公网地址&#xff0c; 做到合理规划不浪费&#xff1b; 3、子网划分&a…

嵌入式开发学习--进程、线程

什么是进程 进程和程序的区别 概念 程序&#xff1a;编译好的可执行文件&#xff0c;存放在磁盘上的指令和数据的有序集合&#xff08;文件&#xff09;&#xff0c;程序是静态的&#xff0c;没有任何执行的概念。 进程&#xff1a;一个独立的可调度的任务&#xff0c;执行一…

做抖音小店如何选品?这几个技巧,精准“锁定”爆品!

哈喽~我是电商月月 做抖音小店最重要的就是选品&#xff0c;这点大家都知道 一个店铺商品选的好&#xff0c;顾客喜欢&#xff0c;质量完好&#xff0c;销量和售后都不用操心&#xff0c;和达人合作时&#xff0c;爆单的机会也就越高 那这种商品是什么样的&#xff0c;新手开…

基于ssm微信小程序的4S店客户管理系统

采用技术 基于ssm微信小程序的4S店客户管理系统的设计与实现~ 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringMVCMyBatis 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 页面展示效果 管理员端 管理员登录 管理员首页 用户管理 门店管理 …

RustGUI学习(iced)之小部件(一):如何使用按钮和文本标签部件

前言 本专栏是学习Rust的GUI库iced的合集&#xff0c;将介绍iced涉及的各个小部件分别介绍&#xff0c;最后会汇总为一个总的程序。 iced是RustGUI中比较强大的一个&#xff0c;目前处于发展中&#xff08;即版本可能会改变&#xff09;&#xff0c;本专栏基于版本0.12.1. 概述…

高效一键改写文章,智能伪原创工具轻松搞定

在信息爆炸的时代&#xff0c;想要高效率的一键改写文章却是很多创作者都想了解的方法。然而在人工智能技术发展的今天&#xff0c;智能伪原创工具的出现&#xff0c;也正是成了广大创作者用来一键改写文章的好方法&#xff0c;因为它的优势&#xff0c;可以为大家轻松完成改写…

光伏二次设备主要有哪些

光伏电站二次设备类型比较多&#xff0c;信息显示、数据安全、远动通信、电能质量、微机保护等都有不同设备相互配合完成&#xff0c;根据项目具体需求来选择&#xff0c;简单可以分为以下几种&#xff1a; 一、光伏二次设备保护屏&#xff1a; 1、光伏二次设备预制舱 二次设…

短视频矩阵系统源码====3年技术公司源头开发商交付

短视频矩阵系统#源头技术打磨 哈尔滨爆火带动了一波“北上热潮”&#xff0c;各地文旅坐不住了&#xff0c;兄弟们开“卷”&#xff01;这波互卷浪潮中&#xff0c;河南率先出圈。如今&#xff0c;河南文旅账号粉丝已经突破200w&#xff01; 01 矩阵打法&#xff0c;很难不火…

超越边界:如何ChatGPT 3.5、GPT-4、DALL·E 3和Midjourney共同重塑创意产业

KKAI&#xff08;kkai人工智能&#xff09;是一个整合了多种尖端人工智能技术的多功能助手平台&#xff0c;融合了OpenAI开发的ChatGPT3.5、GPT4.0以及DALLE 3&#xff0c;并包括了独立的图像生成AI—Midjourney。以下是这些技术的详细介绍&#xff1a; **ChatGPT3.5**&#xf…