SpringBoot 封装 HBase 操作工具类
创始人
2024-02-24 03:38:58
0

        最近项目中用到了Hbase相关的操作并封装成工具类,我的Hbase服务器端版本是2.1.0,图示如下:

        特此记录便于日后查阅。

一、pom.xml 依赖

org.apache.hbasehbase-shaded-client2.1.0

org.apache.hadoophadoop-common3.0.0

二、application.yml 项目配置

        此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置

datasource:hbase:zookeeper:port: 2181quorum: 10.0.61.12,10.0.61.22,10.0.61.24znode:parent: ''

三、HbaseConfig 自定义配置类

import lombok.Data;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @description: Hbase配置类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@Data
@Component
@Configuration
public class HbaseConfig {@Value("${datasource.hbase.zookeeper.quorum}")private String zookeeper;@Value("${datasource.hbase.zookeeper.znode.parent}")private String parent;@Value("${datasource.hbase.zookeeper.port}")private String port;public Connection getConnection() throws IOException {org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", zookeeper);config.set("hbase.zookeeper.property.clientPort", port);if (parent != null && !"".equals(parent)) {config.set("zookeeper.znode.parent", parent);}Connection connection = ConnectionFactory.createConnection(config);return connection;}
}

四、HbaseUtil 工具类

        首先添加 SpringContext 工具类,下面会用到:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;/*** @Description: * @Author:zhangzhixiang* @Date: 2022/7/25* @Version: 1.0*/
@Component
public class SpringContext implements ApplicationContextAware {public static ApplicationContext context;public static Environment env;@Overridepublic void setApplicationContext(ApplicationContext context) throws BeansException {SpringContext.context = context;SpringContext.env = context.getEnvironment();}public static Object getBean(String name) {return context.getBean(name);}public static  T getBean(Class clazz) {return context.getBean(clazz);}public static ApplicationContext getContext() {return context;}public static Environment getEnv() {return env;}public static String getProperty(String key) {return getProperty(key, "");}public static String getProperty(String key, String defaultValue) {return env.getProperty(key, defaultValue);}public static  T getProperty(String key, Class targetType) {return env.getProperty(key, targetType);}public static String getActiveProfile() {return env.getActiveProfiles()[0];}
}

         然后我们来写 HbaseUtil 工具类的代码:

import com.swkj.common.base.context.SpringContext;
import com.swkj.common.base.log.GLog;
import com.swkj.common.base.log.LogFactory;
import com.swkj.common.hbase.config.HbaseConfig;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;/*** @description: Hbase工具类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@DependsOn("springContext")
@Component
public class HbaseUtil {private static final GLog LOG = LogFactory.getLogger(HbaseUtil.class);private static HbaseConfig hbaseConfig = (HbaseConfig) SpringContext.getBean("hbaseConfig");private static Connection connection = null;private static Admin admin = null;private HbaseUtil() {if (connection == null) {try {connection = hbaseConfig.getConnection();admin = connection.getAdmin();} catch (IOException e) {LOG.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);}}}/*** 创建表** @param tableName    表名* @param columnFamily 列族(数组)*/public void createTable(String tableName, String[] columnFamily) throws IOException {TableName name = TableName.valueOf(tableName);//如果存在则删除if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);LOG.error("create htable error! this table {} already exists!", name);} else {HTableDescriptor desc = new HTableDescriptor(name);for (String cf : columnFamily) {desc.addFamily(new HColumnDescriptor(cf));}admin.createTable(desc);}}/*** 插入记录(单行单列族-多列多值)** @param tableName     表名* @param row           行名* @param columnFamilys 列族名* @param columns       列名(数组)* @param values        值(数组)(且需要和列一一对应)*/public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));for (int i = 0; i < columns.length; i++) {put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));table.put(put);}}/*** 插入记录(单行单列族-单列单值)** @param tableName    表名* @param row          行名* @param columnFamily 列族名* @param column       列名* @param value        值*/public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));table.put(put);}/*** 删除一行记录** @param tablename 表名* @param rowkey    行名*/public void deleteRow(String tablename, String rowkey) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes());table.delete(d);}/*** 删除单行单列族记录** @param tablename    表名* @param rowkey       行名* @param columnFamily 列族名*/public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));table.delete(d);}/*** 删除单行单列族单列记录** @param tablename    表名* @param rowkey       行名* @param columnFamily 列族名* @param column       列名*/public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));table.delete(d);}/*** 查找一行记录** @param tablename 表名* @param rowKey    行名*/public static String selectRow(String tablename, String rowKey) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());Result rs = table.get(g);NavigableMap>> map = rs.getMap();for (Cell cell : rs.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}return record;}/*** 查找单行单列族单列记录** @param tablename    表名* @param rowKey       行名* @param columnFamily 列族名* @param column       列名* @return*/public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));Result rs = table.get(g);return Bytes.toString(rs.value());}/*** 查询表中所有行(Scan方式)** @param tablename* @return*/public String scanAllRecord(String tablename) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Scan scan = new Scan();ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {for (Cell cell : result.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}}} finally {if (scanner != null) {scanner.close();}}return record;}/*** 根据rowkey关键字查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加行键过滤器,根据关键字匹配RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 根据rowkey关键字和时间戳范围查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加scan的时间范围scan.setTimeRange(minStamp, maxStamp);RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 删除表操作** @param tablename*/public void deleteTable(String tablename) throws IOException {TableName name = TableName.valueOf(tablename);if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);}}
}

五、使用

        接下来只需要在项目业务类里注入hbaseUtils就可以使用了:

@Autowired
private HbaseUtil hbaseUtil;

        测试方法:

import com.swkj.common.hbase.utils.HbaseUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;/*** @description: Hbase工具类测试* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = "local")
public class HbaseServiceTest {@Autowired
private HbaseUtil hbaseUtil;@Testpublic void testHbase() {try {hbaseUtil.createTable("Student", new String[]{"StuInfo", "Grades"});hbaseUtil.insertOneRecord("Student", "0001", "StuInfo", "name", "Tom Green");hbaseUtil.insertOneRecord("Student", "0002", "StuInfo", "Age", "18");System.out.println("=================" + hbaseUtil.selectValue("Student", "0001", "StuInfo", "name"));System.out.println("=================" + hbaseUtil.selectValue("Student", "0002", "StuInfo", "Age"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0001"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0002"));} catch (Exception e) {e.printStackTrace();}}
}

        到此 SpringBoot 封装 HBase 操作工具类介绍完成。

相关内容

热门资讯

【追踪】河南方城县一民办小学1... 界面新闻记者 | 张旭 界面新闻编辑 | 刘海川 历经两次延期开庭,2025年12月19日上午,...
多件法律案将提请全国人大常委会... 12月19日,全国人大常委会法制工作委员会举行发言人记者会介绍,十四届全国人大常委会第十九次会议将于...
汇源再发声明:已提起诉讼,要求... 北京汇源食品饮料有限公司(以下简称“北京汇源”)重整一案,受到社会各界的广泛关注。重整方案实施以来,...
俄外交部发言人:日方不负责任的... 俄罗斯外交部发言人扎哈罗娃18日表示,俄方始终关注日方加速重新军国主义化的危险性,其国防开支急剧增长...
支持家庭适老化改造 政策再加力 央视网消息:近日,多部门联合出台《关于增强消费品供需适配性进一步促进消费的实施方案》,明确提出:“优...
德龙汇能[000593]关于子... 本版导读 2025-12-20 2025-12-20 2025-12-20 2025...
政策优势显著,发展机遇更多,国... 【环球时报记者 郭媛丹 环球时报驻俄罗斯特派记者 肖新新 环球时报驻法国特约记者 董铭】12月18日...
锚定制度型开放 勇当内陆开放新... “市委十二届九次全会紧扣党中央‘稳步扩大制度型开放’的战略部署,为‘十五五’开放发展擘画了宏伟蓝图,...
世贸组织报告指出——国际贸易政... 日前,世贸组织发布《世贸组织贸易政策审查——国际贸易环境发展概览》,指出全球贸易政策环境在2024年...