三默网为您带来有关“Hive查询结果封装和查询日志回调”的文章内容,供您阅读参考。
Hive查询结果封装和查询日志回调
2023-01-21 17:12:53
本文章主要是解决之前在写数据中平台的时候遇到的俩个问题,一是封装查询后的结果集为前端展示数据,二是携带查询日志。当然结果集和日志可根据需求决定是否返回和返回方式,比如websocket。
下面做测试流程介绍
pom依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
包结构
下面依次把代码贴上
package hive.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class CommandResult implements Serializable {
protected boolean succeeded = true;
protected String msg;
protected List<String> colNm = new ArrayList<>();
protected List<List<Object>> data = new ArrayList<>();
public CommandResult() {
}
public CommandResult(String msg) {
this.msg = msg;
}
public CommandResult(boolean succeeded, String msg) {
this.succeeded = succeeded;
this.msg = msg;
}
public boolean isSucceeded() {
return succeeded;
}
public void setSucceeded(boolean succeeded) {
this.succeeded = succeeded;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public static CommandResult error(String msg) {
CommandResult commandResult = new CommandResult(msg);
commandResult.setSucceeded(false);
return commandResult;
}
public static CommandResult success(String msg) {
CommandResult commandResult = new CommandResult(msg);
commandResult.setSucceeded(true);
return commandResult;
}
public List<String> getColNm() {
return colNm;
}
public CommandResult setColNm(List<String> colNm) {
this.colNm = colNm;
return this;
}
public List<List<Object>> getData() {
return data;
}
public void setData(List<List<Object>> data) {
this.data = data;
}
@Override
public String toString() {
return "CommandResult{" +
"succeeded=" + succeeded +
", msg='" + msg + '\'' +
", colNm=" + colNm +
", data=" + data +
'}';
}
}
package com..hive.service;
import java.sql.Statement;
/**
* hive回调
*
*/
public interface HiveCallable {
/**
* Hive回调函数
*
* @param statement
*/
void call(Statement statement);
}
package com..hive.service;
import com..hive.HiveLog;
import lombok.Data;
import org.apache.hive.jdbc.HiveStatement;
import java.sql.Statement;
/**
* @Author: ZhiWen
* @Date: 2021/6/7
* @Describe:
*/
@Data
public class HiveCallableTask implements HiveCallable{
private String hiveLog;
private String process;
@Override
public void call(Statement statement) {
new HiveLog((HiveStatement) statement, this).start();
}
}
package com..hive;
import com..hive.model.CommandResult;
import com..hive.service.HiveCallable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: ZhiWen
* @Date: 2021/5/26
* @Describe:
*/
public class Execute {
private static Execute execute = new Execute();
private Execute(){}
public static Execute getInstance() {
return execute;
}
public CommandResult query(Connection conn, String sql, HiveCallable callable) throws SQLException {
Statement stat = null;
ResultSet resultSet = null;
try {
stat = conn.createStatement();
if (null != callable) {
callable.call(stat);
}
resultSet = stat.executeQuery(sql);
return getCommandResult(resultSet,100);
} catch (SQLException throwables) {
throwables.printStackTrace();
}finally {
if (resultSet!=null){
resultSet.close();
}
if (stat!=null){
stat.close();
}
}
return null;
}
public int execute(Connection conn, String sql, HiveCallable callable) throws SQLException {
Statement stat = null;
try {
stat = conn.createStatement();
if (null != callable) {
callable.call(stat);
}
int i = stat.executeUpdate(sql);
return i;
} catch (SQLException throwables) {
throwables.printStackTrace();
}finally {
if (stat!=null){
stat.close();
}
}
return -1;
}
private CommandResult getCommandResult(ResultSet rs, int maxRowNum) throws SQLException {
CommandResult result = new CommandResult();
List<String> colNm;
int colSize = rs.getMetaData().getColumnCount();
colNm = new ArrayList<>(colSize);
for (int i = 1; i <= colSize; i++) {
colNm.add(rs.getMetaData().getColumnName(i));
}
result.setColNm(colNm);
int indexResult = 0;
List<List<Object>> resultData = new ArrayList<>();
while (rs.next() && ((maxRowNum < 0) || (indexResult < maxRowNum))) {
List<Object> rowList = new ArrayList<>();
int cols = rs.getMetaData().getColumnCount();
for (int i = 1; i <= cols; i++) {
rowList.add(rs.getObject(i));
}
resultData.add(rowList);
++indexResult;
}
result.setData(resultData);
return result;
}
}
package com..hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* Created by ZhiWen on 2021/4/29 11:44
*/
public class HiveConnection {
public static Connection getConnection() {
Connection conn = null;
String url = "jdbc:hive2://0.0.0.0:10000/default;hive.server2.proxy.user=hadoop";
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
conn = DriverManager.getConnection(url, "hadoop", "");
} catch (ClassNotFoundException e) {
System.out.println(e);
e.printStackTrace();
} catch (SQLException throwables) {
System.out.println(throwables);
throwables.printStackTrace();
}
System.out.println("获取到的Connection连接"+conn);
return conn;
}
public static void close(Connection conn) {
if (conn!=null) {
try {
conn.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
package com..hive;
import com..hive.service.HiveCallableTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.hive.jdbc.HiveStatement;
import java.sql.SQLException;
@Slf4j
public class HiveLog extends Thread {
private static final Integer LINES_SIZE = 100;
private HiveCallableTask task;
public HiveLog(HiveStatement hiveStatement, HiveCallableTask task) {
this.hiveStatement = hiveStatement;
this.task = task;
}
HiveStatement hiveStatement = null;
@Override
public void run() { //真生的输出运行进度的thread
if (hiveStatement == null) {
return;
}
try {
while (!hiveStatement.isClosed() && hiveStatement.hasMoreLogs()) {
try {
for (String log : hiveStatement.getQueryLog(true, LINES_SIZE)) {
task.setHiveLog(log + "\n");
System.out.println(log);
task.setProcess("运行日志输出");
}
} catch (SQLException e) { //防止while里面报错,导致一直退不出循环
log.error("HiveLog error !!", e);
return;
}
}
} catch (SQLException e) {
log.error("HiveLog error !!", e);
}
}
}
测试类
package com..hive;
import com..hive.model.CommandResult;
import com..hive.service.HiveCallableTask;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @Author: ZhiWen
* @Date: 2021/5/26
* @Describe:
*/
public class Do {
public static void main(String[] args) throws SQLException {
String sql_1 = "select\n" +
" *\n" +
"from test.t_task_sql_content limit 5";
String sql_2 = "desc employee";
String sql_3 = "desc formatted employee";
String sql_4 = "describe employee";
Connection connection = HiveConnection.getConnection();
Execute execute = Execute.getInstance();
execute.execute(connection,"set role admin",null);
CommandResult commandResult = execute.query(connection, sql_3, new HiveCallableTask());
connection.close();
System.out.println(commandResult.toString());
}
}
准备到微博写段子去了