但行好事
莫论前程❤

Hbase1.2.0 JavaAPI操作实例(新特性版本)

转发自:https://blog.liyang.io/360.html

package io.liyang.test.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * Created with IntelliJ IDEA.
 * User: liyang
 * Date: 16/6/29
 * Time: 下午12:15
 * DESCIPTION: Hbase 1.2.0 API
 */
public class HBaseTest {
    public static Configuration config = null;

    static {
        config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "192.168.251.11:2181");
    }

    /**
     * 删除rowkey
     *
     * @param tableName 表名
     * @param rowKey    rowKey
     */
    public static void deleteAllColumn(String tableName, String rowKey) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Delete delAllColumn = new Delete(Bytes.toBytes(rowKey));
            table.delete(delAllColumn);
            System.out.println("Delete AllColumn Success");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除指定列
     *
     * @param tableName  表名
     * @param rowKey     rowKey
     * @param familyName 列族
     * @param columnName 列名
     */
    public static void deleteColumn(String tableName, String rowKey, String familyName, String columnName) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Delete delColumn = new Delete(Bytes.toBytes(rowKey));
            delColumn.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            table.delete(delColumn);
            System.out.println("Delete Column Success!!!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查询多个版本的数据
     *
     * @param tableName  表名
     * @param rowKey     rowKey
     * @param familyName 列族
     * @param columnName 列名
     */
    public static void getResultByVersion(String tableName, String rowKey, String familyName, String columnName) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            get.setMaxVersions(5);
            Result result = table.get(get);
            for (Cell cell : result.listCells()) {
                System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                System.out.println("Timestamp:" + cell.getTimestamp());
                System.out.println("--------------------------");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 更新某一列的值
     *
     * @param tableName  表名
     * @param rowKey     rowkey
     * @param familyName 列族
     * @param columnName 列名
     * @param value      值
     */
    public static void updateTable(String tableName, String rowKey, String familyName, String columnName, String value) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value));
            table.put(put);
            System.out.println("Update Table Success!!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查询某一列数据
     *
     * @param tableName  表名
     * @param rowKey     rowKey
     * @param familyName 列族
     * @param columnName 列名
     */
    public static void getResultByColumn(String tableName, String rowKey, String familyName, String columnName) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            Result result = table.get(get);
            for (Cell cell : result.listCells()) {
                System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                System.out.println("Timestamp:" + cell.getTimestamp());
                System.out.println("-------------------------");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 范围查询数据
     *
     * @param tableName   表名
     * @param beginRowKey startRowKey
     * @param endRowKey   stopRowKey
     */
    public static void scanResult(String tableName, String beginRowKey, String endRowKey) {
        //String familyName = "datainfo";
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes(beginRowKey));
        scan.setStopRow(Bytes.toBytes(endRowKey));
        scan.setMaxVersions(1);
        scan.setCaching(20);
        scan.setBatch(10);
        //List<HbaseBean> beanList = new ArrayList<>();
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName));
             ResultScanner rs = table.getScanner(scan)) {
            for (Result result : rs) {
                //HbaseBean bean = result2Bean(result, familyName);
                System.out.println(Bytes.toString(result.getRow()));
                //以下是打印内容
                for (Cell cell : result.listCells()) {
                    System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                    System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                    System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    System.out.println("Timestamp:" + cell.getTimestamp());
                    System.out.println("---------------------");
                }
                //beanList.add(bean);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 全表扫描数据
     *
     * @param tableName 表名
     */
    public static void scanResult(String tableName) {
        Scan scan = new Scan();
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName));
             ResultScanner rs = table.getScanner(scan)) {
            for (Result r : rs) {
                for (Cell cell : r.listCells()) {
                    System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                    System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                    System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    System.out.println("Timestamp:" + cell.getTimestamp());
                    System.out.println("------------------------------");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据rowkey查询数据
     *
     * @param tableName 表名
     * @param rowKey    rowKey
     * @return
     */
    public static Result getResult(String tableName, String rowKey) {
        Result result = null;
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))
        ) {
            Get get = new Get(Bytes.toBytes(rowKey));
            result = table.get(get);
            for (Cell cell : result.listCells()) {
                System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                System.out.println("Timestamp:" + cell.getTimestamp());
                System.out.println("-------------------------------");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 添加数据
     *
     * @param rowKey    rowKey
     * @param tableName 表名
     * @param column    列名
     * @param value     值
     */
    public static void addData(String rowKey, String tableName, String[] column, String[] value) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Put put = new Put(Bytes.toBytes(rowKey));
            HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();
            for (int i = 0; i < columnFamilies.length; i++) {
                String familyName = columnFamilies[i].getNameAsString();
                if (familyName.equals("userinfo")) {
                    for (int j = 0; j < column.length; j++) {
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
                    }
                }
                table.put(put);
                System.out.println("Add Data Success!!!-");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除表
     *
     * @param tableName 表名
     */
    public static void deleteTable(String tableName) {
        try (Connection connection = ConnectionFactory.createConnection(config);
             Admin admin = connection.getAdmin()) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println(tableName + " is deleted!!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建Table
     *
     * @param tableName 表名
     * @param family    列族
     */
    public static void createTable(String tableName, String[] family) {
        HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));
        try (Connection connection = ConnectionFactory.createConnection(config);
             Admin admin = connection.getAdmin()) {

            for (int i = 0; i < family.length; i++) {
                table.addFamily(new HColumnDescriptor(family[i]));
            }
            if (admin.tableExists(TableName.valueOf(tableName))) {
                System.out.println("Table Exists!!");
                System.exit(0);
            } else {
                admin.createTable(table);
                System.out.println("Create Table Success!!! Table Name :[ " + tableName + " ]");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 将结果集中的数据封装到JavaBean中
     *
     * @param result     HbaseResult
     * @param familyName 列族
     * @return
     */
    public static HbaseBean result2Bean(Result result, String familyName) {
        HbaseBean bean = new HbaseBean();
        bean.setMessageId(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageId"))));
        bean.setOwnerUri(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("ownerUri"))));
        bean.setOwnerSite(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("ownerSite"))));
        bean.setPeerUri(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("peerUri"))));
        bean.setPeerSite(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("peerSite"))));
        bean.setPeerurisite(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("peerurisite"))));
        bean.setReferBy(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("referBy"))));
        bean.setStatusCode(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("statusCode"))));
        bean.setUserAgent(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("userAgent"))));
        bean.setMessageType(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageType"))));
        bean.setMessageTime(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageTime"))));
        bean.setMessageKindType(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageKindType"))));
        bean.setMessageFlags(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageFlags"))));
        bean.setMessageDirection(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("messageDirection"))));
        bean.setFileSize(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("fileSize"))));
        bean.setFileName(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("fileName"))));
        bean.setClientIP(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("clientIP"))));
        bean.setAccessAddress(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("accessAddress"))));
        bean.setContentType(Bytes.toString(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes("contentType"))));
        return bean;

    }


    public static void main(String[] args) throws IOException {

//        String[] family = {"userinfo"};
//        createTable("user2", family);


        String[] column = {"name", "age", "email", "phone"};
        /*
        getResult("user1", "rowkey2");
        */

        /*
        scanResult("user1");
        */


        //scanResult("ott_message", "2016070110355700001", "2016070110355700007");
        scanResult("user1", "rowkey1", "rowkey3");



        /*
        getResultByColumn("user1", "rowkey1", "userinfo", "name");
        updateTable("user1", "rowkey1", "userinfo", "name", "zs");
        getResultByColumn("user1", "rowkey1", "userinfo", "name");
        */

        /*
        getResultByVersion("user1", "rowkey1", "userinfo", "name");
        */

        /*
        deleteColumn("user1","rowkey1","userinfo","email");
        */

        /*
        deleteAllColumn("user1","rowkey1");
        */

    }
}
赞(0) 打赏
未经允许不得转载:刘鹏博客 » Hbase1.2.0 JavaAPI操作实例(新特性版本)
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏