スポンサーリンク

【Python】データフレームをSnowflakeのテーブルへインポートする方法

記事内に広告が含まれています。

CSVファイルやTSVファイルなどに格納されたデータをSnowflakeのテーブルへインポートするPythonコードを記載します。

GUIでもインポートできますが、ファイルサイズの制限(50 MB)があり、やや不便です。

COPYコマンドの利用でもインポートできますが、Pythonから実行するための方法として記載します。

スポンサーリンク

サンプルコード

CSVファイルからデータフレームを作成し、Snowflakeのテーブルへ格納するコードです。

import os
import pandas as pd

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from dotenv import load_dotenv
load_dotenv(override=True)


def get_connection():
    conn = snowflake.connector.connect(
        user=os.environ['SNOWFLAKE_USER'],
        password=os.environ['SNOWFLAKE_PASSWORD'],
        account=os.environ['SNOWFLAKE_ACCOUNT'],
        warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
        database=os.environ['SNOWFLAKE_DATABASE'],
        schema=os.environ['SNOWFLAKE_SCHEMA']
    )
    return conn


def main():

    # ヘッダーなし、3カラムのcsvファイルのデータをデータフレームに格納
    column_names = ['Column1', 'Column2', 'Column3']
    df = pd.read_csv('input.csv', header=None, names=column_names)

    conn = get_connection()
    success, nchunks, nrows, _ = write_pandas(conn, df, 'TABLE_NAME')


if __name__ == "__main__":
    main()

Snowflakeのアクセス情報は環境変数に設定しています。

カラム名(’Column1′, ‘Column2’, ‘Column3’)やテーブル名(’TABLE_NAME’)は取込先にあわせて変更してください。

スポンサーリンク

コード解説

必要なモジュールのインポート

import os
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from dotenv import load_dotenv
  • os
    環境変数の操作に使用されます。
  • pandas
    データ操作と解析のためのライブラリ。
  • snowflake.connector
    Snowflakeデータベースに接続するためのライブラリ。
  • write_pandas
    PandasデータフレームをSnowflakeテーブルに書き込むための関数。
  • dotenv
    環境変数を設定ファイルから読み込むためのライブラリ。

環境変数の読込み

.envファイルから環境変数を読み込みます。

load_dotenv(override=True)

.envファイルには、以下のようにSnowflakeの接続情報を記述します。

SNOWFLAKE_USER=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_PASSWORD=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_ACCOUNT=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_WAREHOUSE=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_DATABASE=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_SCHEMA=xxxxxxxxxxxxxxxxxxxxxx

Snowflakeへの接続を確立する関数

def get_connection():
    conn = snowflake.connector.connect(
        user=os.environ['SNOWFLAKE_USER'],
        password=os.environ['SNOWFLAKE_PASSWORD'],
        account=os.environ['SNOWFLAKE_ACCOUNT'],
        warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
        database=os.environ['SNOWFLAKE_DATABASE'],
        schema=os.environ['SNOWFLAKE_SCHEMA']
    )
    return conn
  • get_connection関数は、環境変数からSnowflakeの接続情報を取得し、Snowflakeへの接続を確立して接続オブジェクトを返します。

メイン処理

def main():

    # ヘッダーなし、3カラムのcsvファイルのデータをデータフレームに格納
    column_names = ['Column1', 'Column2', 'Column3']
    df = pd.read_csv('input.csv', header=None, names=column_names)

    conn = get_connection()
    success, nchunks, nrows, _ = write_pandas(conn, df, 'TABLE_NAME')
  • main関数は、CSVファイル input.csv を読み込み、3つのカラム Column1, Column2, Column3 を持つデータフレーム df を作成します。
  • get_connection関数を呼び出してSnowflakeへの接続を確立します。
  • write_pandas関数を使用して、データフレーム dfTABLE_NAME という名前のSnowflakeテーブルに書き込みます。戻り値は書き込みの成功ステータス、チャンク数、書き込まれた行数などです。

エントリーポイント

スクリプトが直接実行されたときに main 関数を呼び出します。

if __name__ == "__main__":
    main()
スポンサーリンク
Python
著者SNS
タイトルとURLをコピーしました