【Snowflake】LambdaからSnowflakeへクエリを投げる

背景

SnowflakeへのクエリはSnowsight上やSnowSQLでしか試したことがなかったので、Pythonコネクタを利用してLambdaからSnowflakeへクエリを投げる方法を試してみる。

内容

今回はLambdaとLambda Layerで構成し、SAMでデプロイをしてみる。

準備するもの

./connect_snowflake
├── layer -- lambda layer用のディレクトリ
├── lambda_function.py -- ソースコード
├── samconfig.toml -- SAM用のconfigファイル
└── template.yaml -- SAM用のテンプレートファイル

各ファイルの中身は以下のようになっている。 LambdaからSnowflakeクエリを非同期で投げ、そのクエリIDを返却するものになっている。

lambda_function.py

import json
import time
import snowflake.connector
from snowflake.connector import ProgrammingError

def lambda_handler(event, context):

    user='' # 自身のユーザ名
    password='' # 自身のパスワード
    account='' # 自身のSnowflakeアカウント

    ctx = snowflake.connector.connect(
        user=user,
        password=password,
        account=account
        )
    cs = ctx.cursor()
    try:

        # クエリを非同期で投げる
        cs.execute_async("SELECT current_version();")
        query_id = cs.sfqid

        # クエリが実行完了するまで待機する
        while ctx.is_still_running(ctx.get_query_status_throw_if_error(query_id)):
            time.sleep(1)

    except ProgrammingError as err:
            print('Programming Error: {0}'.format(err))
            raise err
            
    finally:
        cs.close()
    ctx.close()

    return {
        'statusCode': 200,
        'body': json.dumps(query_id)
    }

template.yml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Connect to Snowflake

Resources:
  ConnectSnowflakeFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./
      Handler: lambda_function.lambda_handler
      Runtime: python3.10
      PackageType: Zip
      Architectures:
        - x86_64
      Layers:
        - !Ref Layer
      Timeout: 10

  Layer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      ContentUri: layer/
      CompatibleRuntimes:
        - python3.10

Outputs:
  LambdaFunction:
    Description: "Connect Snowflake Lambda Function ARN"
    Value: !GetAtt ConnectSnowflakeFunction.Arn
  LmabdaFunctionIamRole:
    Description: "Implicit IAM Role created for Connect Snowflake Function"
    Value: !GetAtt ConnectSnowflakeFunctionRole.Arn

samconfig.toml

version = 0.1
[default]
[default.deploy]
[default.deploy.parameters]
stack_name = "connect-snowflake"
s3_bucket = "" # 自身のS3バケット名
s3_prefix = "connect-snowflake"
region = "ap-northeast-1"
capabilities = "CAPABILITY_IAM"

SAMでデプロイ

まず、Lambda Layerの作成を行う。 layerディレクトリ内にsnowflake-connector-pythonのパッケージをインストールする。

$ cd ./layer
$ docker run --rm -v $(pwd):/var/task public.ecr.aws/sam/build-python3.10:latest pip install "snowflake-connector-python" -t python/lib/python3.10/site-packages/

コマンドを実行するとlayerディレクトリ配下にsnowflake-connector-pythonのパッケージがインストールされる。

※注意したいのは、ビルド環境とLambdaの実行環境のアーキテクチャ(x86_64 or arm64)を合わせるようにすること。Macで開発している場合、パッケージはarmベースのものがインストールされるため、このライブラリをx86_64のLambdaで実行するとエラーが発生するので気を付けたい。

このエラーに関して自分が参考になった記事を貼っておく。

以下のコマンドでデプロイしていく。

$ cd ../
$ sam build 
$ sam deploy --guided 

デプロイ完了。SAMなので簡単!

Lambda上でテストが成功すると実行したクエリIDが表示される。

まとめ

Pythonコネクタを使えばLambdaからでも簡単にSnowflakeへクエリを投げるのも簡単にできる。