三默网为您带来有关“Hive查询结果封装和查询日志回调”的文章内容,供您阅读参考。

Hive查询结果封装和查询日志回调

2023-01-21 17:12:53

本文章主要是解决之前在写数据中平台的时候遇到的俩个问题,一是封装查询后的结果集为前端展示数据,二是携带查询日志。当然结果集和日志可根据需求决定是否返回和返回方式,比如websocket。

下面做测试流程介绍

 

pom依赖

<dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

 

 包结构

Hive查询结果封装和查询日志回调

下面依次把代码贴上

package hive.model;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class CommandResult implements Serializable {

    protected boolean succeeded = true;
    protected String msg;
    protected List<String> colNm = new ArrayList<>();
    protected List<List<Object>> data = new ArrayList<>();

    public CommandResult() {
    }

    public CommandResult(String msg) {
        this.msg = msg;
    }

    public CommandResult(boolean succeeded, String msg) {
        this.succeeded = succeeded;
        this.msg = msg;
    }

    public boolean isSucceeded() {
        return succeeded;
    }

    public void setSucceeded(boolean succeeded) {
        this.succeeded = succeeded;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public static CommandResult error(String msg) {
        CommandResult commandResult = new CommandResult(msg);
        commandResult.setSucceeded(false);
        return commandResult;
    }

    public static CommandResult success(String msg) {
        CommandResult commandResult = new CommandResult(msg);
        commandResult.setSucceeded(true);
        return commandResult;
    }

    public List<String> getColNm() {
        return colNm;
    }

    public CommandResult setColNm(List<String> colNm) {
        this.colNm = colNm;
        return this;
    }

    public List<List<Object>> getData() {
        return data;
    }

    public void setData(List<List<Object>> data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "CommandResult{" +
                "succeeded=" + succeeded +
                ", msg='" + msg + '\'' +
                ", colNm=" + colNm +
                ", data=" + data +
                '}';
    }
}
package com..hive.service;

import java.sql.Statement;

/**
 * hive回调
 *
 */
public interface HiveCallable {
    /**
     * Hive回调函数
     *
     * @param statement
     */
    void call(Statement statement);
}
package com..hive.service;

import com..hive.HiveLog;
import lombok.Data;
import org.apache.hive.jdbc.HiveStatement;

import java.sql.Statement;

/**
 * @Author: ZhiWen
 * @Date: 2021/6/7
 * @Describe:
 */
@Data
public class HiveCallableTask implements HiveCallable{

    private String hiveLog;
    private String process;

    @Override
    public void call(Statement statement) {
        new HiveLog((HiveStatement) statement, this).start();
    }


}
package com..hive;

import com..hive.model.CommandResult;
import com..hive.service.HiveCallable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: ZhiWen
 * @Date: 2021/5/26
 * @Describe:
 */
public class Execute {

    private static Execute execute = new Execute();

    private Execute(){}

    public static Execute getInstance() {
        return execute;
    }

    public CommandResult query(Connection conn, String sql, HiveCallable callable) throws SQLException {
        Statement stat = null;
        ResultSet resultSet = null;
        try {
            stat = conn.createStatement();
            if (null != callable) {
                callable.call(stat);
            }
            resultSet = stat.executeQuery(sql);
            return getCommandResult(resultSet,100);
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }finally {
            if (resultSet!=null){
                resultSet.close();
            }
            if (stat!=null){
                stat.close();
            }
        }
        return null;
    }


    public int execute(Connection conn, String sql, HiveCallable callable) throws SQLException {
        Statement stat = null;
        try {
            stat = conn.createStatement();
            if (null != callable) {
                callable.call(stat);
            }
            int i = stat.executeUpdate(sql);

            return i;
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }finally {

            if (stat!=null){

                stat.close();
            }
        }
        return -1;
    }


    private CommandResult getCommandResult(ResultSet rs, int maxRowNum) throws SQLException {
        CommandResult result = new CommandResult();

        List<String> colNm;
        int colSize = rs.getMetaData().getColumnCount();
        colNm = new ArrayList<>(colSize);
        for (int i = 1; i <= colSize; i++) {
            colNm.add(rs.getMetaData().getColumnName(i));
        }
        result.setColNm(colNm);

        int indexResult = 0;
        List<List<Object>> resultData = new ArrayList<>();
        while (rs.next() && ((maxRowNum < 0) || (indexResult < maxRowNum))) {
            List<Object> rowList = new ArrayList<>();
            int cols = rs.getMetaData().getColumnCount();
            for (int i = 1; i <= cols; i++) {
                rowList.add(rs.getObject(i));
            }
            resultData.add(rowList);
            ++indexResult;
        }
        result.setData(resultData);
        return result;
    }
}
package com..hive;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/**
 * Created by ZhiWen on 2021/4/29 11:44
 */
public class HiveConnection {




    public static Connection getConnection() {
        Connection conn = null;
        String url = "jdbc:hive2://0.0.0.0:10000/default;hive.server2.proxy.user=hadoop";
        try {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            conn = DriverManager.getConnection(url, "hadoop", "");
        } catch (ClassNotFoundException e) {
            System.out.println(e);
            e.printStackTrace();
        } catch (SQLException throwables) {
            System.out.println(throwables);
            throwables.printStackTrace();
        }
        System.out.println("获取到的Connection连接"+conn);
        return conn;
    }

    public static void close(Connection conn) {
        if (conn!=null) {
            try {
                conn.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }

}
package com..hive;

import com..hive.service.HiveCallableTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.hive.jdbc.HiveStatement;

import java.sql.SQLException;


@Slf4j
public class HiveLog extends Thread {
    private static final Integer LINES_SIZE = 100;
    private HiveCallableTask task;

    public HiveLog(HiveStatement hiveStatement, HiveCallableTask task) {
        this.hiveStatement = hiveStatement;
        this.task = task;
    }

    HiveStatement hiveStatement = null;

    @Override
    public void run() { //真生的输出运行进度的thread
        if (hiveStatement == null) {
            return;
        }
        try {
            while (!hiveStatement.isClosed() && hiveStatement.hasMoreLogs()) {
                try {
                    for (String log : hiveStatement.getQueryLog(true, LINES_SIZE)) {
                        task.setHiveLog(log + "\n");
                        System.out.println(log);
                        task.setProcess("运行日志输出");
                    }
                } catch (SQLException e) { //防止while里面报错,导致一直退不出循环
                    log.error("HiveLog error !!", e);
                    return;
                }
            }
        } catch (SQLException e) {
            log.error("HiveLog error !!", e);
        }
    }
}

测试类

package com..hive;

import com..hive.model.CommandResult;
import com..hive.service.HiveCallableTask;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
 * @Author: ZhiWen
 * @Date: 2021/5/26
 * @Describe:
 */
public class Do {

    public static void main(String[] args) throws SQLException {

        String sql_1 = "select\n" +
                "    *\n" +
                "from test.t_task_sql_content  limit 5";
        String sql_2 = "desc employee";
        String sql_3 = "desc formatted  employee";
        String sql_4 = "describe employee";
        Connection connection = HiveConnection.getConnection();


        Execute execute = Execute.getInstance();

        execute.execute(connection,"set role admin",null);

        CommandResult commandResult = execute.query(connection, sql_3, new HiveCallableTask());
        connection.close();
        System.out.println(commandResult.toString());

    }
}

准备到微博写段子去了