当前位置: 首页 > news >正文

【hadoop】Hbase java api 案例

 代码实现:

HBaseConnection.java

package com.peizheng.bigdata;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;public class HBaseConnection {public static Connection connection = null;static {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");conf.set("hbase.zookeeper.property.clientPort","2181");try {connection = ConnectionFactory.createConnection(conf);} catch (IOException e) {e.printStackTrace();}}public static void closeConnection(){try {connection.close();} catch (IOException e) {e.printStackTrace();}}
}

HBaseOperation.java

package com.peizheng.bigdata;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;public class HBaseOperation {public static void createNameSpace(String namespace) throws IOException {// 1 获取admin对象   另有Table对象Admin admin = HBaseConnection.connection.getAdmin();// 1.1 Builder类NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);// 1.2 添加需求,这里是添加了自定义的描述信息//builder.addConfiguration("user","peizheng");// 2 调用方法,创建命名空间admin.createNamespace(builder.build());// 3 关闭adminadmin.close();}public static void createTable(String name, String[] cols) throws IOException {Admin admin = HBaseConnection.connection.getAdmin();HTableDescriptor hTableDescriptor = new HTableDescriptor(name);for (String col : cols) {HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);hColumnDescriptor.setMaxVersions(5);hTableDescriptor.addFamily(hColumnDescriptor);}admin.createTable(hTableDescriptor);admin.close();}public static void putCell(String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));table.put(put);table.close();}// 查询// 单行读取public static void getRow(String tableName, String rowKey) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));// Result -> Cell[]Result result = table.get(get);// cell存储非常底层Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.println(family + ":" + colunm + "," + value);}table.close();}public static void getCell(String tableName, String rowKey, String familyName, String columnName) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(columnName));// Result -> Cell[]Result result = table.get(get);// cell存储非常底层Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.println(family + ":" + colunm + "," + value);}table.close();}public static void scanRows(String tableName, String startRowKey, String endRowKey) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();// 指定起始的行 (包含)scan.setStartRow(Bytes.toBytes(startRowKey));// 指定结束的行 (默认不包含)scan.setStopRow(Bytes.toBytes(endRowKey));ResultScanner scanner = table.getScanner(scan);// Result记录一行数据,Cell数组// ResultScanner记录多行数据,Result数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.print(family + ":" + colunm + "," + value + "\t");}System.out.println();}table.close();}public static void filterScan(String tableName, String startRowKey, String endRowKey, String familyName, String columnName, String val) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();// 指定起始的行 (包含)scan.setStartRow(Bytes.toBytes(startRowKey));// 指定结束的行 (默认不包含)scan.setStopRow(Bytes.toBytes(endRowKey));FilterList filterList = new FilterList();//设置过滤器//SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(familyName),Bytes.toBytes(columnName),CompareFilter.CompareOp.LESS_OR_EQUAL,Bytes.toBytes(val));//添加过滤器filterList.addFilter(singleColumnValueFilter);scan.setFilter(filterList);ResultScanner scanner = table.getScanner(scan);// Result记录一行数据,Cell数组// ResultScanner记录多行数据,Result数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.print(family + ":" + colunm + "," + value + "\t");}System.out.println();table.close();}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");conf.set("hbase.zookeeper.property.clientPort", "2181");HBaseConnection.connection = ConnectionFactory.createConnection(conf);String tableName = "temperature";String[] cols = {"cf"};if (!HBaseConnection.connection.getAdmin().tableExists(TableName.valueOf(tableName))) {createTable(tableName, cols);}BufferedReader bufferedReader = new BufferedReader(new FileReader("F:/temperature.log"));String line;while ((line = bufferedReader.readLine()) != null) {String[] splits = line.split(",");String id = splits[0].trim();String year = splits[1].trim();String temperature = splits[2].trim();String rowKey = id + ":" + year;putCell(tableName, rowKey, "cf", "id", id);putCell(tableName, rowKey, "cf", "year", year);putCell(tableName, rowKey, "cf", "temperature", temperature);}bufferedReader.close();HBaseConnection.closeConnection();}
}

相关运行结果:

java程序运行结果:

 hbase客户端运行结果:

scan 'temperature'

 

报错解决

一直运行中可能是设置连接的是ip,不是master,slave1,slave2,这种,可能报错Caused by: org.apache.hadoop.hbase.MasterNotRunningException: java.net.UnknownHostExce。在网上找了半天的原因也没有找到的话参考下面文章修改 windows的ssh配置文件:

ip,主机名供参考:

【hadoop】创建 SSH 别名来连接远程 linux-CSDN博客

http://www.xdnf.cn/news/336565.html

相关文章:

  • 【嵌入式开发-CAN】
  • 美化IDEA注释:Idea 中快捷键 Ctrl + / 自动注释的缩进(避免添加注释自动到行首)以及 Ctrl + Alt + l 全局格式化代码的注释缩进
  • Java 异常
  • 深入理解 Docker 网络原理:构建高效、灵活的容器网络
  • 缓存局部性保留
  • 【Python】PDF文件处理(PyPDF2、borb、fitz)
  • 2022年8月,​韩先超对中移信息进行微服务架构原理(Docker+k8s+DevOps+Go等)培训
  • MYSQL的行级锁到底锁的是什么东西
  • iOS 模块化开发流程
  • DeepSeek多尺度数据:无监督与原则性诊断方案全解析
  • 查看jdk是否安装并且配置成功?(Android studio安装前的准备)
  • Vue3 + Node.js 实现客服实时聊天系统(WebSocket + Socket.IO 详解)
  • 大模型深度思考与ReAct思维方式对比
  • Linux下部署Keepalived
  • Oracle免费认证来袭
  • 计算机学习路线与编程语言选择(信息差)
  • 排序算法-选择排序
  • 计算机网络常识:缓存、长短连接 网络初探、URL、客户端与服务端、域名操作 tcp 三次握手 四次挥手
  • v-model原理详解
  • Java 对象克隆(Object Cloning)详解
  • 【统计学基础】随机抽样的特点
  • Oracle OCP认证考试考点详解083系列13
  • Windows系统安装Cursor与远程调用本地模型QWQ32B实现AI辅助开发
  • 服务器托管的常见问题
  • Rspack:字节跳动自研 Web 构建工具-基于 Rust打造高性能前端工具链
  • C——VS的调试技巧
  • 图灵码上爬第5题:屠龙刀--爬虫逆向
  • 7系列 之 OSERDESE2
  • Pandas比MySQL快?
  • CentOS的防火墙工具(firewalld和iptables)的使用