S3に溜めたCloudTrailログを出力してみる

             図A システム構成図
#!/usr/bin/env python3.8
import boto3
import gzip
import io
import sys
import os
import traceback
import json

target_bucket = 'trail-test'
target_path = 'AWSLogs/123456789012/CloudTrail/ap-northeast-1/2024/07/22'

def main():
try:
s3 = boto3.resource('s3')
s3client = boto3.client('s3')
my_bucket = s3.Bucket(target_bucket)
f = open("22.json","w+")
decoder = json.JSONDecoder()
for object in my_bucket.objects.all():
if target_path in object.key:
obj = s3client.get_object(
Bucket=target_bucket,
Key=object.key)['Body'].read()
file = gzip.open(io.BytesIO(obj), 'rt')
for row in file.readlines():
str_t = decoder.raw_decode(row)
str_s = json.dumps(str_t,indent=2,ensure_ascii=False)
str_s = str_s[1:-1]
f.write(str_s)
f.close()

except json.decoder.JSONDecodeError as ex:
err_message = ex.__class__.__name__
t = traceback.format_exception_only(type(ex), ex)
except Exception as ex:
err_message = ex.__class__.__name__
t = traceback.format_exception_only(type(ex), ex)
print(t,err_message)
sys.exit(1)

if __name__ == '__main__':
main()
#!/usr/bin/env python3.8
import boto3
import gzip
import io
import sys
import os
import traceback
import json

target_bucket = 'trail-test'
target_path = 'AWSLogs/123456789012/CloudTrail/ap-northeast-1/2024/07/22'

def main():
try:
s3 = boto3.resource('s3')
s3client = boto3.client('s3')
my_bucket = s3.Bucket(target_bucket)
f = open("22.json","w+")
decoder = json.JSONDecoder()
for object in my_bucket.objects.filter(Prefix=target_path):
obj = s3client.get_object(
Bucket=target_bucket,
Key=object.key)['Body'].read()
file = gzip.open(io.BytesIO(obj), 'rt')
for row in file.readlines():
str_t = decoder.raw_decode(row)
str_s = json.dumps(str_t,indent=2,ensure_ascii=False)
str_s = str_s[1:-1]
f.write(str_s)
f.close()

except json.decoder.JSONDecodeError as ex:
err_message = ex.__class__.__name__
t = traceback.format_exception_only(type(ex), ex)
except Exception as ex:
err_message = ex.__class__.__name__
t = traceback.format_exception_only(type(ex), ex)
print(t,err_message)
sys.exit(1)

if __name__ == '__main__':
main()