使用 AWS athena 查询和 Boto3 创建数据框

我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出存储桶,因此我使用 Lambda 函数将作为 athena 查询结果的数据获取到数据帧中:

我的代码:

def athena_query_to_dataframe(db, s3Bucket, query):

    import boto3
    import pandas as pd

    client = boto3.client('athena')
    listOfStatus = ['SUCCEEDED', 'FAILED', 'CANCELLED']
    listOfInitialStatus = ['RUNNING', 'QUEUED']

    print('Starting Query Execution:')

    tempS3Path = 's3://{}'.format(s3Bucket)

    response = client.start_query_execution(
        QueryString = query,
        QueryExecutionContext = {
            'Database': db
        },
        ResultConfiguration = {
            'OutputLocation': tempS3Path,
        }
    )

    queryExecutionId = response['QueryExecutionId']

    status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']

    while status in listOfInitialStatus:
        status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
        if status in listOfStatus:
            if status == 'SUCCEEDED':
                print('Query Succeeded!')
                paginator = client.get_paginator('get_query_results')
                query_results = paginator.paginate(
                    QueryExecutionId = queryExecutionId,
                    PaginationConfig = {'PageSize': 1000}
                )
            elif status == 'FAILED':
                print('Query Failed!')
            elif status == 'CANCELLED':
                print('Query Cancelled!')
            break

    results = []
    rows = []

    print('Processing Response')

    for page in query_results:
        for row in page['ResultSet']['Rows']:
            rows.append(row['Data'])

    columns = rows[0]
    rows = rows[1:]

    columns_list = []
    for column in columns:
        columns_list.append(column['VarCharValue'])

    print('Creating Dataframe')

    dataframe = pd.DataFrame(columns = columns_list)

    for row in rows:
        df_row = []
        try:
            for data in row:
                df_row.append(data['VarCharValue'])
            dataframe.loc[len(dataframe)] = df_row
        except:
            pass

当我试图返回 df.shape 时,我只得到 (0,20) 这意味着 df 没有被行更新。

我正在寻找以下输出:

  1. 修复上述问题以获取填充行。
    2.如果有更好的方法来获取数据框
stack overflow Create dataframe using AWS athena query and Boto3
原文答案

答案:

作者头像

最简单的答案是使用 {JMX=}。
{JMX=}
用您的值替换 {JVM=} 和 {JVM=}。

作者头像

您还可以使用 pandas 内置函数 pd.read_sql(),因为 awswrangler 需要多个权限才能执行。

import pandas as pd
df = pd.read_sql("select * from db_name.table_name")