Java代码将Mysql表数据导入HBase表
目录
一、项目目录介绍
二、主要接口方法与调用
三、全局配置文件
四、难点
1、面向对象OOP的设计理念
2、数据格式转化
3、依赖的需求与使用
五、执行方式和输出介绍
六、项目依赖
七、拓展
一、项目目录介绍
主要包含四个接口:Com、RDB、HBase、RDBToHBase和其实现类xxxImpl。
两个配置文件:log4j日志配置文件log4j.properties
和全局配置文件tranfer.properties
(具体见下面展示)
备注:这里用的关系型数据库是Mysql。即:RDB为Mysql
二、主要接口方法与调用
1、Com接口
一些通用方法
1 2 3 4 5
| void close() void init() Properties config() default String checkAndGetConfig(String key){} default void closeAll(AutoCloseable...acs){}
|
2、RDB接口:继承Com接口
其实现类主要实现:关系型数据库连接,执行sql语句将返回的结果集转化为Put对象
1 2
| boolean hashNextBatch() throws SQLException; List<Put> nextBatch() throws SQLException;
|
3、HBase接口:继承Com接口
其实现类主要实现:连接HBase,获取HBase表,上传数据
1
| void putBatch(List<Put> batch) throws IOException;
|
4、RDBToHBase接口
其实现类主要实现:将RDB数据导入HBase主体逻辑
1 2 3
| void setRDB(RDB rdb); void setHBase(HBase base); void startTransfer();
|
5、主要调用逻辑
- 最终调用逻辑 start 方法
1 2 3 4 5 6 7 8 9 10 11 12
| RDB rdb = new RDBImpl(config);
HBase hBase = new HBaseImpl(config);
RDBToHBase rdbToHBase = new RDBToHBaseImpl();
rdbToHBase.setRDB(rdb);
rdbToHBase.setHBase(hBase);
rdbToHBase.startTransfer();
|
- rdbToHBase对象的startTransfer(迁移数据)方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override public void startTransfer() { try { rdb.init(); loggerRH.info("RDB 初始化成功"); hbase.init(); loggerRH.info("HBase 初始化成功");
loggerRH.info("数据从 RDB 迁移至 HBase 开始..."); int count = 0; while (rdb.hashNextBatch()){ final List<Put> batch = rdb.nextBatch(); hbase.putBatch(batch); loggerRH.info(String.format("第 %d 批:%d 条数据插入成功", ++count, batch.size())); } loggerRH.info("数据从 RDB 迁移至 HBase 结束..."); }catch (Exception e){ loggerRH.error("将 RDB 数据批量迁移至 HBase 异常:",e); } finally { hbase.close(); rdb.close(); } }
|
三、全局配置文件
transfer.properties配置文件按需求设置如下参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| rdb.driver=com.mysql.cj.jdbc.Driver
rdb.url=jdbc:mysql://192.168.146.130:3306/test_db_for_bigdata
rdb.username=root
rdb.password=123456
rdb.batchSize=100
rdb.sql=select test_id,test_name,test_age,test_gender,test_phone from test_table1_for_hbase_import
rdb.hbase.columns.mapping=ROWKEY->test_id,baseInfo:name->test_name,baseInfo:age->test_age,baseInfo:gender->test_gender,baseInfo:phone->test_phone
hbase.table.name=hbase_test:tranfer_from_mysql
hbase.zk=192.168.146.130:2181
|
备注:可以按需求配置不同的mysql表导入hbase表
四、难点
1、面向对象OOP的设计理念
代码中不存在重复代码块,整体逻辑可以调用思路清晰
解决方式:提前总体的设计好接口继承关系等
实现方式:
2、数据格式转化
关系型数据库数据格式转化为HBase行键,列族,列,值的格式
解决方式:配置文件按一定格式配置映射关系,在写相应的代码逻辑解析
实现方式:用Map的方式存储映射关系,方便后续使用
3、依赖的需求与使用
依赖需求见:五、项目依赖
使用hbase,mysql连接对象操作mysql,hbase。hadoop上传数据的Put格式的参数与使用。zookeeper配置。
解决方式:了解每个对象的方法的作用和其参数
实现方式:
- 创建一个HBase连接对象需要设置ZooKeeper的地址
- 通过HBase连接对象获取HBase的Admin对象,用于执行DDL操作
- 数据库连接对象创建并返回一个PreparedStatement对象,用于执行预编译的SQL语句
- 等等…
备注:代码里含大量注释,感兴趣的可以下载代码查看
五、执行方式和输出介绍
1、运行项目配置
点击右上角编辑配置,添加Application配置好如下区域,点击Apply (应用)然后点击OK


2、运行结果输出
- 代码中含有详细日志信息可以在运行日志中查看报错信息,以及执行情况


- 在项目同路径下生成日志文件(可更改log4j.properties配置文件改变路径以及输出格式)

3、验证是否HBase表中数据是否正确添加
进入hbase命令行模式执行命令查看
1
| hbase shell # 进入hbase命令行模式
|
使用scan命令
1 2
| truncate 'hbase_test:tranfer_from_mysql' # 清空hbase_test命名空间下的tranfer_from_mysql表 scan 'hbase_test:tranfer_from_mysql' # 查看表
|

六、项目依赖
maven项目pom.xml文件依赖配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.3.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
|
七、拓展
1、增加数据源
类似RDB接口的设计与实现增加其他数据源
文件作为数据源
从提供的接口抓取每天的数据作为数据源
2、持续更新中