且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用 Python(不使用 PySpark)将 Pandas 数据帧插入到现有的 Hive 外部表中?

更新时间:2023-09-13 17:08:04

您似乎正在尝试从 Hive 表中读入 Pandas 数据帧并进行一些转换并将其保存回某些 Hive 外部表.请参考以下代码作为示例.在这里,我已从 Hive 表中读取到 Pandas 数据框中,并在其中添加了一些日期列.后来我使用 subprocess 模块来执行我的 shell,它将数据加载到 Hive 表中,该表在某个日期列上进行分区.

It seems that you are trying to read into pandas dataframe from Hive table and doing some transformation and saving that back to some Hive external table. Please refer below code as sample. Here I have read from Hive table into pandas dataframe and added some date column to it. Later I have used subprocess module to execute my shell, which will load data into Hive table which is partitioned on some date column.

from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys

conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()

query="select user_id,country from test_dev_db.test_data"

start_time= datetime.datetime.now()

output_file='/home/vikct001/user/vikrant/python/test_data.csv'

data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)

data.to_csv(output_file, sep='|', encoding='utf-8',index=None)

hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""

def save_to_hdfs(output_file):
        print("I am here")
        p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1)


save_to_hdfs(output_file)
end_time=datetime.datetime.now()

print 'processing ends', (start_time-end_time).seconds/60.0,' minutes'

表格说明:

hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id                      int
country                 string
input_date              date
loaded_date             string

# Partition Information
# col_name              data_type               comment

loaded_date             string

可以看到数据已经加载完成并创建了一个当前日期的分区.

you can see that data has been loaded and created a partition with current date.

hive (test_dev_db)> show partitions test_dev_db.test_data_external;
OK
loaded_date=2019-08-21


hive (test_dev_db)> select * from test_dev_db.test_data_external;
OK
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21