更新时间:2023-09-13 17:08:04
您似乎正在尝试从 Hive 表中读入 Pandas 数据帧并进行一些转换并将其保存回某些 Hive 外部表.请参考以下代码作为示例.在这里,我已从 Hive 表中读取到 Pandas 数据框中,并在其中添加了一些日期列.后来我使用 subprocess 模块来执行我的 shell,它将数据加载到 Hive 表中,该表在某个日期列上进行分区.
It seems that you are trying to read into pandas dataframe from Hive table and doing some transformation and saving that back to some Hive external table. Please refer below code as sample. Here I have read from Hive table into pandas dataframe and added some date column to it. Later I have used subprocess module to execute my shell, which will load data into Hive table which is partitioned on some date column.
from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys
conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()
query="select user_id,country from test_dev_db.test_data"
start_time= datetime.datetime.now()
output_file='/home/vikct001/user/vikrant/python/test_data.csv'
data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)
data.to_csv(output_file, sep='|', encoding='utf-8',index=None)
hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""
def save_to_hdfs(output_file):
print("I am here")
p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
save_to_hdfs(output_file)
end_time=datetime.datetime.now()
print 'processing ends', (start_time-end_time).seconds/60.0,' minutes'
表格说明:
hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id int
country string
input_date date
loaded_date string
# Partition Information
# col_name data_type comment
loaded_date string
可以看到数据已经加载完成并创建了一个当前日期的分区.
you can see that data has been loaded and created a partition with current date.
hive (test_dev_db)> show partitions test_dev_db.test_data_external;
OK
loaded_date=2019-08-21
hive (test_dev_db)> select * from test_dev_db.test_data_external;
OK
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21