S3バケットに保存されたログをCloudWatch Logsに転送する

目次

はじめに

AWS上でデータ基盤を構築するために、データウェアハウスの選択肢としてAmazon Redshiftクラスターを選択することは一般的であると思います。

Redshiftは下記3種類の監査ログをS3バケットに出力することが出来ますが、S3バケットに保存されたログを閲覧しようとすると、Amazon Athenaを使用してあらかじめ作成した外部テーブルに対してSQLクエリを実行する必要があります。あるいは、Amazon OpenSearch Serviceを使用してログ分析基盤を構築し、そこに取り込むというのも手かもしれません。

  • 接続ログ – 認証試行、接続、切断をログに記録します。
  • ユーザーログ – データベースのユーザー定義の変更に関する情報をログに記録します。
  • ユーザーアクティビティログ – 各クエリをデータベースで実行される前にログに記録します。

システムでログ分析基盤をしっかり持っていたり、AthenaやSQLクエリを自由に扱えるのであれば問題はありませんが、例えばそういったスキルセットを持たないシステム運用者がRedshiftの監査ログをもっとカジュアルに確認したい、という場合は、S3バケットに保存されたログをCloudWatch Logsに取り込んでマネージメントコンソール上でGUI操作で閲覧できると便利です。

また、Redshiftの監査ログをCloudWatch Logsに取り込むと、メトリクスフィルターを作成することでログ監視を行うこともできるようになり、より多くのシステム要件に対応することもできます。

本記事では、S3バケットに保存されたログをLambda関数でCloudWatch Logsに取り込むという流れを試してみます。

※やはり需要があったのか2022年4月にAWS公式でRedshiftからCloudWatch Logsへログ出力が出来るようになりました。本記事ではRedshiftのログ取り込みを例に挙げていますが、記事の内容は依然他のS3ログの取り込みに対しても有効な手法です。

Amazon Redshift が監査ログの新たな機能強化を発表

S3バケットのログをCloudWatch Logsに取り込む

CloudWatch Logsに取り込まれたログをS3バケットに保存するという流れであれば、サブスクリプションフィルターを使用してKinesis Data Firehoseへ流し込み、ログ整形用のLambda関数(Kinesis Data Firehose Cloudwatch Logs Processor)を使用する手法が良く用いられますが、その逆を行うことはAWS標準の機能では完結しません。

S3バケットに保存されたログをCloudWatch Logsに取り込むには、S3イベント通知と専用のLambda関数を使用します。 本記事では例としてS3バケットに保存されたRedshiftの監査ログを取り込み対象としますが、原理的には他のログにも応用可能です。

S3に保存されたログをCloudWatch Logsに取り込む流れ

S3イベント通知の設定

S3イベント通知を設定する詳細なステップは省略しますが、まずは、Redshiftが監査ログを出力するS3バケットの以下プレフィックスにファイルが保管されたら、Lambda関数が実行されるようイベント通知を設定します。

AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz

S3バケットに保管されたファイルを判別する関数

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
    print("Input Bucket : " + bucket)
    print("Input File Full Path : " + key)
    
    #get the file name from the key
    file_pattern1='redshift_useractivitylog_'
    file_pattern2='redshift_connectionlog_'
    file_pattern3='redshift_userlog_'
    file_name = key.split('/')[8]
    key_path=key.replace(file_name,'')

    if file_pattern1 in key:
    
        loggroup='useractivitylogs'
        logstream='useractivitylogstream'
    
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)
                    
    if file_pattern2 in key:

        loggroup='connectionlogs'
        logstream='connectionlogstream'
        
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)

    if file_pattern3 in key:

        loggroup='userlogs'
        logstream='userlogstream'
        
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)

    else:
        print("Skipping")

最初に実行されるlambda_handler関数で、S3イベント通知で連携されたログのファイル名を判別し、Redshiftの3種類の監査ログのどれであるか判別します。 判別したログは、それぞれのログ種別用にあらかじめ作成しておくCloudWatch Logsグループとストリームに転送するよう、ファイル名、S3パスと合わせてlogtransfer関数に渡されます。

ログの内容を展開してCloudWatch Logsに流し込む準備をする関数

def logtransfer(loggroup, logstream, bucket, key, file_name, key_path):
    print("Input File : " + file_name)
    print("Input File Path : " + key_path)

    res = logs_client.describe_log_streams(
        logGroupName=loggroup,
        logStreamNamePrefix=logstream,
    )
    seq_token = res['logStreams'][0].get('uploadSequenceToken', None)
    print("SequenceToken : " + str(seq_token))
    if (seq_token == None):

        res = logs_client.put_log_events(
            logGroupName=loggroup,
            logStreamName=logstream,
            logEvents=[
                {
                    'timestamp': int(time.time()) * 1000,
                    'message': "first attempt"
                },
            ]
        )
            
        res = logs_client.describe_log_streams(
            logGroupName=loggroup,
            logStreamNamePrefix=logstream,
        )
        seq_token = res['logStreams'][0]['uploadSequenceToken']
        print("SequenceToken : " + str(seq_token))

    else:

        res = logs_client.describe_log_streams(
            logGroupName=loggroup,
            logStreamNamePrefix=logstream,
        )
        seq_token = res['logStreams'][0]['uploadSequenceToken']
        print("SequenceToken : " + str(seq_token))

    with open('/tmp/'+file_name, 'wb') as f:
    
        #download the file
        s3.download_fileobj(bucket, key, f)
            
    #extract the content from gzip
    events = []
    i = 0
    with gzip.open('/tmp/'+file_name, 'rt', "utf_8") as fi:
        for line in fi:
                        
            i = i + 1

            ev = {'timestamp' : int(time.time()) * 1000, 'message' : line}
            events.append(ev)
            if (i == 1):
                print("First event : ")
                print(line)
            if (i % 500 == 0): # a higher value may result of a buffer overflow from Boto3 because you cant send more than 1MB of events in one call
                print("Send events " + str(i))
                res = streamevents(events, seq_token, loggroup, logstream)
                seq_token = res['nextSequenceToken']
                events = []
        streamevents(events, seq_token, loggroup, logstream)
                
        print("Last event : ")
        print(line)
        
        print(str(i) + " events sent to CloudWatch Logs")

CloudWatch Logsにログを転送する際は、PUTリクエストに1つ前のリクエストで表示されたSequenceTokenを含める必要があるため、SequenceTokenを取得する処理を行います。なぜSequenceTokenが必要なのか疑問があったためWebを調べると、真偽は不明ですが、AWS内部でログ取り込みを並行処理する際に、前回取り込まれたログと正しい順番で後続ログを取り込むことを保証するためではないかと推測されている方がいました。

普段コードを書かないため初回SequenceToken取得の流れが上手く作れていませんが、上記のコードでも用は足ります。

その後、同じ関数内で渡されたS3バケットの所定プレフィックスに保存されたログをgzip展開し、streamevents関数にログイベントとSequenceTokenをまとめて渡します。

CloudWatch Logsへログを転送する関数

def streamevents(events, sequenceToken, loggroup, logstream):
    kwargs = {
        'logGroupName':loggroup,
        'logStreamName':logstream,
        'logEvents':events,
    }
    if (sequenceToken != None):
        kwargs.update({'sequenceToken': sequenceToken})
    return logs_client.put_log_events(**kwargs)

ここまで出来れば、後はlogs_client.put_log_eventsで渡された情報を元にCloudWatch Logsにログを書き込むだけです。 なお、Lambda関数を実行するために必要なIAMポリシー・ロールや、出力先のCloudWatch Logsグループ・ストリームは別途作成しておきます。

IAMポリシーは、S3に対するGet, ListとCloudWatch LogsのPutLogEvents, DescribeLogStreams, CreateLogGroup, CreateLogStreamがあれば動作しました。

Lambda関数のコード全体

import json
import urllib
import boto3
import re
import gzip
import time

#s3 client
s3 = boto3.client('s3')

#log client
logs_client = boto3.client('logs')

def streamevents(events, sequenceToken, loggroup, logstream):
    kwargs = {
        'logGroupName':loggroup,
        'logStreamName':logstream,
        'logEvents':events,
    }
    if (sequenceToken != None):
        kwargs.update({'sequenceToken': sequenceToken})
    return logs_client.put_log_events(**kwargs)

def logtransfer(loggroup, logstream, bucket, key, file_name, key_path):
    print("Input File : " + file_name)
    print("Input File Path : " + key_path)

    res = logs_client.describe_log_streams(
        logGroupName=loggroup,
        logStreamNamePrefix=logstream,
    )
    seq_token = res['logStreams'][0].get('uploadSequenceToken', None)
    print("SequenceToken : " + str(seq_token))
    if (seq_token == None):

        res = logs_client.put_log_events(
            logGroupName=loggroup,
            logStreamName=logstream,
            logEvents=[
                {
                    'timestamp': int(time.time()) * 1000,
                    'message': "first attempt"
                },
            ]
        )
            
        res = logs_client.describe_log_streams(
            logGroupName=loggroup,
            logStreamNamePrefix=logstream,
        )
        seq_token = res['logStreams'][0]['uploadSequenceToken']
        print("SequenceToken : " + str(seq_token))

    else:

        res = logs_client.describe_log_streams(
            logGroupName=loggroup,
            logStreamNamePrefix=logstream,
        )
        seq_token = res['logStreams'][0]['uploadSequenceToken']
        print("SequenceToken : " + str(seq_token))

    with open('/tmp/'+file_name, 'wb') as f:
    
        #download the file
        s3.download_fileobj(bucket, key, f)
            
    #extract the content from gzip
    events = []
    i = 0
    with gzip.open('/tmp/'+file_name, 'rt', "utf_8") as fi:
        for line in fi:
                        
            i = i + 1

            ev = {'timestamp' : int(time.time()) * 1000, 'message' : line}
            events.append(ev)
            if (i == 1):
                print("First event : ")
                print(line)
            if (i % 500 == 0): # a higher value may result of a buffer overflow from Boto3 because you cant send more than 1MB of events in one call
                print("Send events " + str(i))
                res = streamevents(events, seq_token, loggroup, logstream)
                seq_token = res['nextSequenceToken']
                events = []
        streamevents(events, seq_token, loggroup, logstream)
                
        print("Last event : ")
        print(line)
        
        print(str(i) + " events sent to CloudWatch Logs")

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
    print("Input Bucket : " + bucket)
    print("Input File Full Path : " + key)
    
    #get the file name from the key
    file_pattern1='redshift_useractivitylog_'
    file_pattern2='redshift_connectionlog_'
    file_pattern3='redshift_userlog_'
    file_name = key.split('/')[8]
    key_path=key.replace(file_name,'')

    if file_pattern1 in key:
    
        loggroup='useractivitylogs'
        logstream='useractivitylogstream'
    
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)
                    
    if file_pattern2 in key:

        loggroup='connectionlogs'
        logstream='connectionlogstream'
        
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)

    if file_pattern3 in key:

        loggroup='userlogs'
        logstream='userlogstream'
        
        logtransfer(loggroup, logstream, bucket, key, file_name, key_path)

    else:
        print("Skipping")

CloudWatch Logsに取り込んだログをCloudWatch Logs Insightsで閲覧

それでは、CloudWatch Logsに取り込んだログをCloudWatch Logs Insightsで閲覧しましょう。

Lambda関数の中できれいに監査ログをフィールドごとに整理出来ればLogs Insightsを使用する必要はありませんが、今回は平文のログをそのままCloudWatch Logsに転送しているため、閲覧時に専用のクエリ言語でログをフィールドごとに整形する必要があります。監査ログのフィールドの詳細はAWSのドキュメントに記載があります。

UseractivityLogs

parse @message "'* [ db=* user=* pid=* userid=* xid=* ]' LOG: *" as recordtime, db, user, pid, userid, xid, query
| sort recordtime asc
| filter user like /user01/

ConnectionLogs

parse @message "* |*|* |* |*|* |* |* |*|* |* |*|*|*|* |*|* |* |*|*" as event, recordtime, remotehost, remoteport, pid, dbname, username, authmethod, duration, sslversion, sslcipher, mtu, sslcompression, sslexpansion, iamauthguid, application_name, driver_version, os_version, plugin_name
| sort recordtime asc

Userlogs

parse @message "*|* |*|* |*|*|*|*|*|*|*" as userid, username, oldusername, action, usecreatedb, usesuper, usecatupd, valuntil, pid, xid, recordtime
| sort recordtime asc

結果

S3に保存されたRedshiftの監査ログをCloudWatch Logs Insightsで閲覧することが出来ました!

S3バケットに保存されたgzip圧縮のテキストログ(可読性が低い)

S3バケットに保存されたテキストログ

CloudWatch Logs Insightで整形されたログ

CloudWatch Logs Insightで整形されたログ

システム設計の仕方により、色々な方法でログ閲覧を行えますが、一つの手法として本記事が参考になれば幸いです。

元情報 / 参考にさせて頂いた記事

データベース監査ログ作成 - Amazon Redshift

A lambda function to stream Application Load Balancer logs dropped in S3 to CloudWatch Logs · GitHub

Lambda (Python) から特定のログストリームにログを書こうとして苦戦した2つのポイント - Qiita