CSVファイルやTSVファイルなどに格納されたデータをSnowflakeのテーブルへインポートするPythonコードを記載します。
GUIでもインポートできますが、ファイルサイズの制限(50 MB)があり、やや不便です。
COPYコマンドの利用でもインポートできますが、Pythonから実行するための方法として記載します。
サンプルコード
CSVファイルからデータフレームを作成し、Snowflakeのテーブルへ格納するコードです。
import os
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from dotenv import load_dotenv
load_dotenv(override=True)
def get_connection():
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
account=os.environ['SNOWFLAKE_ACCOUNT'],
warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
database=os.environ['SNOWFLAKE_DATABASE'],
schema=os.environ['SNOWFLAKE_SCHEMA']
)
return conn
def main():
# ヘッダーなし、3カラムのcsvファイルのデータをデータフレームに格納
column_names = ['Column1', 'Column2', 'Column3']
df = pd.read_csv('input.csv', header=None, names=column_names)
conn = get_connection()
success, nchunks, nrows, _ = write_pandas(conn, df, 'TABLE_NAME')
if __name__ == "__main__":
main()
Snowflakeのアクセス情報は環境変数に設定しています。
カラム名(’Column1′, ‘Column2’, ‘Column3’)やテーブル名(’TABLE_NAME’)は取込先にあわせて変更してください。
コード解説
必要なモジュールのインポート
import os
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from dotenv import load_dotenv
os
環境変数の操作に使用されます。pandas
データ操作と解析のためのライブラリ。snowflake.connector
Snowflakeデータベースに接続するためのライブラリ。write_pandas
PandasデータフレームをSnowflakeテーブルに書き込むための関数。dotenv
環境変数を設定ファイルから読み込むためのライブラリ。
環境変数の読込み
.env
ファイルから環境変数を読み込みます。
load_dotenv(override=True)
.envファイルには、以下のようにSnowflakeの接続情報を記述します。
SNOWFLAKE_USER=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_PASSWORD=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_ACCOUNT=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_WAREHOUSE=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_DATABASE=xxxxxxxxxxxxxxxxxxxxxx
SNOWFLAKE_SCHEMA=xxxxxxxxxxxxxxxxxxxxxx
Snowflakeへの接続を確立する関数
def get_connection():
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
account=os.environ['SNOWFLAKE_ACCOUNT'],
warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
database=os.environ['SNOWFLAKE_DATABASE'],
schema=os.environ['SNOWFLAKE_SCHEMA']
)
return conn
get_connection
関数は、環境変数からSnowflakeの接続情報を取得し、Snowflakeへの接続を確立して接続オブジェクトを返します。
メイン処理
def main():
# ヘッダーなし、3カラムのcsvファイルのデータをデータフレームに格納
column_names = ['Column1', 'Column2', 'Column3']
df = pd.read_csv('input.csv', header=None, names=column_names)
conn = get_connection()
success, nchunks, nrows, _ = write_pandas(conn, df, 'TABLE_NAME')
main
関数は、CSVファイルinput.csv
を読み込み、3つのカラムColumn1
,Column2
,Column3
を持つデータフレームdf
を作成します。get_connection
関数を呼び出してSnowflakeへの接続を確立します。write_pandas
関数を使用して、データフレームdf
をTABLE_NAME
という名前のSnowflakeテーブルに書き込みます。戻り値は書き込みの成功ステータス、チャンク数、書き込まれた行数などです。
エントリーポイント
スクリプトが直接実行されたときに main
関数を呼び出します。
if __name__ == "__main__":
main()