はじめまして、Goのpackage import pathのためにgithubアカウントを短くしたymgytです。
ハウテレビジョンでは2018年5月15日にiOSアプリ外資就活ドットコムのリニューアル版をリリースいたしました。
このアプリのバックエンドをAWS Elastic Container Service(ESC)とGoを中心に作りました。本blogではその際にハマったところや意識したところを述べていきたいと思います。
構成

主に利用しているAWS関連のリソースは
- ALB(Application Load Balancer)
- Autoscaling Group(EC2)
- ECS
- Lambda
- CloudWatch(Logs, Alarm, Events)
- RDS
- Elasticache(redis)
AWSリソースの作成はterraformを利用しました。
terraformはvagrant,packer等を提供しているHashiCorpが作成したGo製のCLIツールです。
私はAWSのリソース管理にはCloudformationを利用しておりましたが、今Projectではterraformを用いて行うことになっており、terraform初挑戦でした.
Cloudformationを利用していた時は、AWS CLIやAWS SDKを用いてCloudformation APIを叩くことになりますが、この際の適用scriptは自分でつくる必要があります。丁寧にやろうとするといきなりCloudformationのstackを更新せずに、Change SetというAWS リソースへの変更を表現したデータを一度作成して、内容を確認した後にこれを適用する必要があります。また、各リソースのパラメータ毎に更新された場合にリソースが破棄=>再作成されるのか、動的に更新されるのかが決まっているのですが、このあたりを意識せずにterraform plan
を実行すればよいだけなのがすごく良かったです。
どんな設定fileでもそうですが、再利用性を考えた場合や後の変更箇所は変数化されていていき、結果変数の数が増えていきます。Cloudformationでは1 templateで利用できるパラメータ数が60という制限があり、適宜Mappingやroot templateを設けてtemplateを細分化して対応したりして対応していました。terraformでは variable.tf
fileを作成して変数を宣言し, terraform.tfvars
fileを作成して変数を明示的に管理できる仕組みが用意されており、templateと変数の分離/管理が行いやすかったです。ただし、terraformはAWSに特化したツールではないので、Cloudformationで利用できる組み込み関数(GetAZs
)やConditionといったものは利用できません。また各リソースのパラメータについての説明はterraform上のdocumentは説明が簡素なので、場合によってはCloudformationのdocumentを参照する必要がでてきます。
Go製CLIツール
最後は個人的な好みになりますが、terraformがGoで書かれているので+5億点でした。terraformの中では、HashiCorp創業者であるMitchell Hashimoto氏が作成された github.com/mitchellh/cli
やgithub.com/mitchellh/panicwrap
等が利用されており、読んでいてとても勉強になります。
ECS
先日のAWS Summit Tokyo 2018でap-northeast-1 regionでのAWS Fargateの対応予定がアナウンスされましたが構築中の時期ではFargateはap-northeast-1では未対応でしたので、通常のEC2で作成しました。
Cluster
ECSではEC2 Instanceの集合をClusterという概念で捉え、CPU使用率やメモリ使用率といったリソースの使用状況もCluster単位で把握できる仕組みを提供してくれています。ただし、すべてをCluster単位で考えて、それぞれのEC2 Instanceのことは考えなくてよいかというと後述するポートの関係等でEC2 Instanceを意識せざるをえない場面もあります。最初にわかりづらかった点として、EC2 Instanceを作成するに際して、当該InstanceとECS上のなにかを対応づける情報が一切ない点です.
Cluster作成のtf fileのsample
resource "aws_launch_configuration" "ecs" {
name_prefix = "ecs-${var.app_name}-"
image_id = "${var.image_id}"
instance_type = "${var.instance_type}"
iam_instance_profile = "${aws_iam_instance_profile.ecs.id}"
key_name = "${var.key_name}"
security_groups = ["${aws_security_group.ecs_node_sg.id}"]
user_data = "${data.template_file.ecs_config.rendered}"
associate_public_ip_address = false
lifecycle {
create_before_destroy = true
}
}
resource "aws_autoscaling_group" "ecs" {
vpc_zone_identifier = ["${data.aws_subnet.public_L.id}", "${data.aws_subnet.public_R.id}"]
name = "${var.app_name}-group-${var.env_name}"
min_size = "${var.asg_min_size}"
max_size = "${var.asg_max_sieze}"
health_check_grace_period = 300
health_check_type = "ELB"
desired_capacity = "${var.ec2_desired_capacity}"
launch_configuration = "${aws_launch_configuration.ecs.name}"
lifecycle {
create_before_destroy = true
}
tag {
key = "Name"
value = "${var.app_name}-node-${var.env_name}"
propagate_at_launch = true
}
tag {
key = "ClusterName"
value = "${aws_ecs_cluster.api.name}"
propagate_at_launch = true
}
}
resource "aws_iam_instance_profile" "ecs" {
name = "ecs-instance-profile-${var.app_name}-${var.env_name}"
path = "/"
role = "${aws_iam_role.ecs_instance_role.name}"
}
resource "aws_iam_role" "ecs_instance_role" {
name = "ecs_instance_role-${var.app_name}-${var.env_name}"
assume_role_policy = <<EOF
{
"Version": "2008-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_policy" "ecs_instance_policy" {
name = "ecs_instance_policy-${var.app_name}-${var.env_name}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"ec2:Describe*"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "ecs_instance_role_attach_manage" {
role = "${aws_iam_role.ecs_instance_role.name}"
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
}
resource "aws_iam_role_policy_attachment" "ecs_instance_role_attach_custom" {
role = "${aws_iam_role.ecs_instance_role.name}"
policy_arn = "${aws_iam_policy.ecs_instance_policy.arn}"
}
tagやIAM Roleにecsを意識した設定はでてくるものの、Clusterの作成は通常のautoscale groupを利用したEC2 Instanceの作成とかわりません。 autoscale groupをclusterにしているのが、user_dataとして渡している起動scriptでおこなわれるecs agent設定です。
user_data.sh
#!/bin/bash
# Install JQ JSON parser
yum install -y jq aws-cli
# Get the current region and instance_id from the instance metadata
region=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)
instance_id=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
# Fetch ECS cluster name from tag
cluster_name=$(aws ec2 describe-instances --region $region --instance-ids $instance_id | jq '.Reservations[0].Instances[0].Tags | from_entries | .ClusterName' -r)
echo ECS_CLUSTER=$cluster_name >> /etc/ecs/ecs.config
echo ECS_AVAILABLE_LOGGING_DRIVERS=[\"json-file\",\"syslog\",\"fluentd\",\"awslogs\"] >> /etc/ecs/ecs.config
echo ECS_UPDATES_ENABLED=true >> /etc/ecs/ecs.config
echo ECS_ENGINE_TASK_CLEANUP_WAIT_DURATION=1h >> /etc/ecs/ecs.config
echo ECS_CONTAINER_STOP_TIMEOUT=5m >> /etc/ecs/ecs.config
AMIはECS-Optimized AMIを利用しているので、ecs-agentはpreinstallされています.ecs agentは /etc/ecs/ecs.config
に従って機能するので、起動時に必要なパラメータを設定します。この処理のおかげてECS上のCluterという単位でEC2 Instance群を捉えられるようになります。 FargateとEC2 Instanceを抽象化するためか、ECS上ではEC2 InstanceはECS Container InstanceとしてAPIのパラメータやdocument上で扱われます。
Service
Clusterを定義した後は、Serviceを作成します・
Serviceは最初はわかりづらい概念ですが、私はコンテナ群とロードバランサーを紐付ける概念上のモデルと捉えています。ServiceのパラメータでlaunchType(FARGATE,EC2)を設定したり、taskDefinition(docker-componse.yml)やコンテナ数(task数)を設定します。
Deploy
ECSにおけるDeployは、Serviceのtask definitionの更新なのでdeploy自体は下記のコマンドで実行しています
elb_target_group_arn=$(aws elbv2 describe-target-groups --names ${ELB_TARGET_NAME} | jq -r '.TargetGroups[].TargetGroupArn')
ecs-cli compose --project-name ${PROJECT_NAME} --task-role-arn ecs_task_role-${PROJECT_NAME} service up --container-name api --container-port 80 --target-group-arn ${elb_target_group_arn}
ecs-cliがdocker-compose.yml fileをtaskDefinitionとして扱ってくれるのでdocker imageを更新したうえでecs-cli service up
を叩くとdeployが走ります。 ports: 0:80
は動的ポートを設定しており、コンテナをスタートするとホスト上で動的に確保されたポートをALBのtarget groupに登録してくれます。この設定によって、1台のEC2 Instanceに複数のtaskを配置できEC2 Instanceのリソースを有効に使い切れます。 Client -> ALB(443) -> EC2 Instance(32777) -> Docker Container(80)というイメージです。
ecs-cliはaws-cliとは別にawsによって管理されており、こちらもGo製です。
version: '2'
services:
api:
image: ${IMAGE_TAG_API}
mem_limit: ${MEM_LIMIT}
ports:
- "0:80"
environment:
- APP_ENV=${APP_ENV}
command: /usr/local/bin/api-runner
logging:
driver: fluentd
options:
fluentd-address: ${FLUENTD_ADDRESS}
tag: apiv2.{{.Name}}.{{.ID}}
Autoscaling
ECSのAutoscaling、特にlaunch typeをEC2に設定している場合,autoscalingさせるには2つの設定が必要です。それがServiceのscalingとClusterのscalingです.
アプリケーションの負荷状況に応じて配備しているtask数を動的に変更していくのがServiceのautoscaling, Serviceが利用できるCPU/メモリはClusterとして登録されているEC2 InstanceのCPU/メモリの合計なので、これを超えてtaskを配置するためのClusterのautoscalingがそれぞれ必要になります。
Service
resource "aws_iam_role" "ecs_autoscale_role" {
name = "${var.app_name}-ecs-autoscale-role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "application-autoscaling.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "ecs_autoscale_role_attach" {
role = "${aws_iam_role.ecs_autoscale_role.name}"
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceAutoscaleRole"
}
resource "aws_appautoscaling_target" "main" {
max_capacity = 20
min_capacity = 2
resource_id = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
role_arn = "${aws_iam_role.ecs_autoscale_role.arn}"
scalable_dimension = "ecs:service:DesiredCount"
service_namespace = "ecs"
}
resource "aws_appautoscaling_policy" "scale_out" {
name = "${aws_ecs_cluster.api.name}-scale-out"
resource_id = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
scalable_dimension = "ecs:service:DesiredCount"
service_namespace = "ecs"
step_scaling_policy_configuration {
adjustment_type = "ChangeInCapacity"
cooldown = 60
metric_aggregation_type = "Average"
step_adjustment {
metric_interval_lower_bound = 0
scaling_adjustment = 1
}
}
depends_on = ["aws_appautoscaling_target.main"]
}
resource "aws_appautoscaling_policy" "scale_in" {
name = "${aws_ecs_cluster.api.name}-scale-in"
resource_id = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
scalable_dimension = "ecs:service:DesiredCount"
service_namespace = "ecs"
step_scaling_policy_configuration {
adjustment_type = "ChangeInCapacity"
cooldown = 60
metric_aggregation_type = "Average"
step_adjustment {
metric_interval_upper_bound = 0
scaling_adjustment = -1
}
}
depends_on = ["aws_appautoscaling_target.main"]
}
resource "aws_cloudwatch_metric_alarm" "ecs_service_cpu_utilization_high" {
alarm_name = "ecs-service-${var.app_name}-cpu-high-alarm"
comparison_operator = "GreaterThanOrEqualToThreshold"
evaluation_periods = 1
metric_name = "CPUUtilization"
namespace = "AWS/ECS"
period = "180"
statistic = "Average"
threshold = "70"
actions_enabled = true
treat_missing_data = "missing"
alarm_description = "ecs service"
dimensions {
ClusterName = "${aws_ecs_cluster.api.name}"
ServiceName = "${var.app_name}-${var.env_name}"
}
alarm_actions = ["${aws_appautoscaling_policy.scale_out.arn}"]
insufficient_data_actions = []
ok_actions = []
depends_on = ["aws_appautoscaling_policy.scale_out"]
}
resource "aws_cloudwatch_metric_alarm" "ecs_service_cpu_utilization_low" {
alarm_name = "ecs-service-${var.app_name}-cpu-low-alarm"
comparison_operator = "LessThanThreshold"
evaluation_periods = 1
metric_name = "CPUUtilization"
namespace = "AWS/ECS"
period = "3600"
statistic = "Average"
threshold = "5"
actions_enabled = true
treat_missing_data = "missing"
alarm_description = "ecs service"
dimensions {
ClusterName = "${aws_ecs_cluster.api.name}"
ServiceName = "${var.app_name}-${var.env_name}"
}
alarm_actions = ["${aws_appautoscaling_policy.scale_in.arn}"]
insufficient_data_actions = []
ok_actions = []
depends_on = ["aws_appautoscaling_policy.scale_in"]
}
このように、appautoscaling policyを作成、cloudwatch alarmを作成して、metricの閾値に応じてactionを実行する形でautoscalingさせるよう設定しました。ただしこの方法ですと、scale downさせるためにもcloudwatch alarmを設定しなくてはならず、問題がないのもかかわらずalarmが発火してしまうので、他のalarm運用と衝突してしまいかねないので、なにかいい方法がないか探しているところでもあります。
Cluster
resource "aws_autoscaling_policy" "scale_out" {
name = "autoscaling-${aws_ecs_cluster.api.name}-scale-out"
policy_type = "SimpleScaling"
adjustment_type = "ChangeInCapacity"
scaling_adjustment = 1
cooldown = 300
autoscaling_group_name = "${aws_autoscaling_group.ecs.name}"
}
resource "aws_autoscaling_policy" "scale_in" {
name = "autoscaling-${aws_ecs_cluster.api.name}-scale-in"
policy_type = "SimpleScaling"
adjustment_type = "ChangeInCapacity"
scaling_adjustment = -1
cooldown = 300
autoscaling_group_name = "${aws_autoscaling_group.ecs.name}"
}
resource "aws_cloudwatch_metric_alarm" "ecs_cluster_memory_reservation_high" {
alarm_name = "ecs-cluster-${aws_ecs_cluster.api.name}-memory-reservation-high-alarm"
comparison_operator = "GreaterThanOrEqualToThreshold"
evaluation_periods = 1
metric_name = "MemoryReservation"
namespace = "AWS/ECS"
period = "60"
statistic = "Average"
threshold = "70"
actions_enabled = true
treat_missing_data = "missing"
alarm_description = "ecs cluster memory reservation high"
dimensions {
ClusterName = "${aws_ecs_cluster.api.name}"
}
alarm_actions = ["${aws_autoscaling_policy.scale_out.arn}"]
insufficient_data_actions = []
ok_actions = []
}
resource "aws_cloudwatch_metric_alarm" "ecs_cluster_memory_reservation_low" {
alarm_name = "ecs-cluster-${aws_ecs_cluster.api.name}-memory-reservation-low-alarm"
comparison_operator = "LessThanThreshold"
evaluation_periods = 1
metric_name = "MemoryReservation"
namespace = "AWS/ECS"
period = "3600"
statistic = "Average"
threshold = "30"
actions_enabled = true
treat_missing_data = "missing"
alarm_description = "ecs cluster memory reservation low"
dimensions {
ClusterName = "${aws_ecs_cluster.api.name}"
}
alarm_actions = ["${aws_autoscaling_policy.scale_in.arn}"]
insufficient_data_actions = []
ok_actions = []
}
Cluster側も同様にautoscalingのscaling policyを作成し、監視するMetricに応じたcloudwatch alarmを作成して、scalingさせていきます。 こちらは通常のEC2 InstanceのAutoscaleと変わらないかと思います。ただし、ここでCluster側が一切ECSを意識しなくてよいことによる問題が生じます。それはclusterに属するEC2 Instanceがscale inする際に当該Instaceで起動しているContainerに関知しないことです。そこで、scale in時にhookを設定し、gracefulな終了処理を行います。
Terminate Hook Lambda

処理の概要としては、Autoscaleのscalein時のHookにSNS通知を設定し、Lambdaを起動します。Lambdaの中で通知情報から対象EC2 Instanceを取得し、当該EC2 Instance(ECSからはContainer Instace)の状態をECS APIを利用してDrainingに変更します。その後running状態のtask数を取得し、まだrunning状態のtaskがある場合には再度snsに自身を起動した通知を再度pushし、running状態のtaskが0の場合は、autoscale APIのcompleteLifeCycleAction APIを呼び出します。 LambdaはGoで実装しました。
terminating hook lambda
resource "aws_iam_role" "asg_lifecycle_hook_role" {
name = "asg-lifecycle-hook-${aws_ecs_cluster.api.name}"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "autoscaling.amazonaws.com"
}
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "asg_lifecycle_hook_role_attach" {
role = "${aws_iam_role.asg_lifecycle_hook_role.name}"
policy_arn = "arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"
}
resource "aws_autoscaling_lifecycle_hook" "terminating" {
name = "${aws_ecs_cluster.api.name}-terminating-hook"
lifecycle_transition = "autoscaling:EC2_INSTANCE_TERMINATING"
autoscaling_group_name = "${aws_autoscaling_group.ecs.name}"
default_result = "ABANDON"
heartbeat_timeout = 900
notification_target_arn = "${aws_sns_topic.asg_lifecycle_hook_terminating.arn}"
role_arn = "${aws_iam_role.asg_lifecycle_hook_role.arn}"
}
resource "aws_iam_role" "terminate_hook_lambda_role" {
name = "${var.app_name}-asg-terminate-hook-lambda-role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_policy" "terminate_hook_lambda_policy" {
name = "${var.app_name}-asg-terminate-hook-policy"
path = "/"
description = "asg,cw,ec2,sns operations"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"autoscaling:CompleteLifecycleAction",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"ec2:DescribeInstances",
"ec2:DescribeInstanceAttribute",
"ec2:DescribeInstanceStatus",
"ec2:DescribeHosts",
"ecs:ListContainerInstances",
"ecs:SubmitContainerStateChange",
"ecs:SubmitTaskStateChange",
"ecs:DescribeContainerInstances",
"ecs:UpdateContainerInstancesState",
"ecs:ListTasks",
"ecs:DescribeTasks",
"sns:Publish",
"sns:ListSubscriptions"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "terminate_hook_lambda_basic_execution" {
role = "${aws_iam_role.terminate_hook_lambda_role.name}"
policy_arn = "${aws_iam_policy.terminate_hook_lambda_policy.arn}"
}
resource "aws_lambda_function" "terminating_hook" {
s3_bucket = "${var.lambda_s3_bucket}"
s3_key = "${lookup(var.lambda_s3_key, "asg-terminate-hook")}"
s3_object_version = "${lookup(var.lambda_s3_object_version, "asg-terminate-hook")}"
description = "make sure no task running when terminating cluster instance."
function_name = "${var.app_name}-asg-terminate-hook"
role = "${aws_iam_role.terminate_hook_lambda_role.arn}"
handler = "handler"
runtime = "go1.x"
memory_size = 128
timeout = 10
environment {
variables = {
LAMBDA_LOG_ENCODE = "console"
LAMBDA_AWS_REGION = "ap-northeast-1"
LAMBDA_LOG_LEVEL = -1
LAMBDA_EC2_TAG_KEY_CLUSTER_NAME = "ClusterName"
}
}
}
resource "aws_lambda_permission" "sns_invoke" {
statement_id = "${var.app_name}-allow-execute-from-sns"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.terminating_hook.function_name}"
principal = "sns.amazonaws.com"
source_arn = "${aws_sns_topic.asg_lifecycle_hook_terminating.arn}"
}
lambda
前述したとおり、AutoscalingはClusterの情報をもたないので、terminateされようとしているEC2 Instanceから所属しているClusterを取得する必要があります。約束としてTagにClusterNameを設定する運用をおこなっていますが、lambdaの中では、tagとuser dataのparseという2つのアプローチでClusterNameを取得しようとしています。
package main
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/davecgh/go-spew/spew"
"github.com/juju/errors"
"go.uber.org/zap"
)
const (
ConInsStatusActive = "ACTIVE"
ConInsStatusInactive = "INACTIVE"
ConInsStatusDraining = "DRAINING"
TaskStatusRunning = "RUNNING"
TaskStatusStopped = "STOPPED"
TaskStatusPending = "PENDING"
)
type App struct {
Service *Service
Options AppOptions
Log *zap.Logger
}
type AppOptions struct {
LogLevel int
LogEncode string
Region string
EC2InstanceTagKeyClusterName string
}
func newAppOptionsFromEnv() AppOptions {
logLevel, err := strconv.ParseInt(os.Getenv("LAMBDA_LOG_LEVEL"), 10, 0)
if err != nil {
logLevel = 0
}
logEncode := os.Getenv("LAMBDA_LOG_ENCODE")
if logEncode == "" {
logEncode = "console"
}
region := os.Getenv("LAMBDA_AWS_REGION")
if region == "" {
region = "ap-northeast-1"
}
clusterTag := os.Getenv("LAMBDA_EC2_TAG_KEY_CLUSTER_NAME")
if clusterTag == "" {
clusterTag = "ClusterName"
}
return AppOptions{
LogLevel: int(logLevel),
LogEncode: logEncode,
Region: region,
EC2InstanceTagKeyClusterName: clusterTag,
}
}
func NewApp() *App {
app, err := newApp(newAppOptionsFromEnv())
if err != nil {
fmt.Println("failed to init app", err.Error())
os.Exit(1)
}
return app
}
func newApp(o AppOptions) (*App, error) {
logger, err := GetLogger(
WithLoggingLevel(o.LogLevel),
WithEncoded(o.LogEncode),
)
if err != nil {
return nil, err
}
service, err := NewService(o.Region)
if err != nil {
return nil, err
}
return &App{
Service: service,
Log: logger,
Options: o,
}, nil
}
type Service struct {
EC2 *ec2.EC2
ECS *ecs.ECS
ASG *autoscaling.AutoScaling
SNS *sns.SNS
}
func NewService(region string) (*Service, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
return nil, err
}
return &Service{
EC2: ec2.New(sess),
ECS: ecs.New(sess),
ASG: autoscaling.New(sess),
SNS: sns.New(sess),
}, nil
}
func (a *App) Wait(ctx context.Context) {
deadline, ok := ctx.Deadline()
a.Log.Debug("wait", zap.Time("deadline", deadline))
if ok {
wakeup := deadline.Add(time.Second * -2)
if time.Now().Before(wakeup) {
a.Log.Debug("wait", zap.Time("before_wait", time.Now()))
select {
case <-time.After(time.Until(wakeup)):
}
a.Log.Debug("wait", zap.Time("after_wait", time.Now()))
}
}
}
func (a *App) PublishSNS(ctx context.Context, topicARN, message string) error {
const subject = "publish sns to invoke me again"
input := &sns.PublishInput{
Message: aws.String(message),
Subject: aws.String(subject),
TopicArn: aws.String(topicARN),
}
a.Wait(ctx)
output, err := a.Service.SNS.Publish(input)
if err != nil {
return errors.Trace(err)
}
a.Log.Info("publish_sns",
zap.String("msg", "successfully publish sns"),
zap.String("message_id", aws.StringValue(output.MessageId)))
return nil
}
func (a *App) CompleteASGLifecycleAction(e LifecycleEvent) error {
input := &autoscaling.CompleteLifecycleActionInput{
AutoScalingGroupName: aws.String(e.AutoScalingGroupName),
InstanceId: aws.String(e.EC2InstanceID),
LifecycleActionResult: aws.String("CONTINUE"),
LifecycleActionToken: aws.String(e.LifecycleActionToken),
LifecycleHookName: aws.String(e.LifecycleHookName),
}
_, err := a.Service.ASG.CompleteLifecycleAction(input)
if err != nil {
return errors.Trace(err)
}
a.Log.Info("comple_asg_lifecycle_hook",
zap.String("msg", "successfully complete lifecycle hook"))
return nil
}
func (a *App) getTasks(clusterName, containerInstanceARN, status string) ([]*ecs.Task, error) {
if clusterName == "" || containerInstanceARN == "" {
return nil, errors.Errorf("clusterName %s and/or containerInstanceARN %s is empty", clusterName, containerInstanceARN)
}
inputList := &ecs.ListTasksInput{
Cluster: aws.String(clusterName),
ContainerInstance: aws.String(containerInstanceARN),
DesiredStatus: aws.String(status),
}
var taskARNs []*string
for {
outputList, err := a.Service.ECS.ListTasks(inputList)
if err != nil {
return nil, errors.Annotatef(err, "clusterName %q, containerInstanceARN %q", clusterName, containerInstanceARN)
}
taskARNs = append(taskARNs, outputList.TaskArns...)
if outputList.NextToken == nil {
break
}
inputList = inputList.SetNextToken(*outputList.NextToken)
}
if len(taskARNs) == 0 {
return nil, errors.NotFoundf("clusterName %q, containerInstanceARN %q", clusterName, containerInstanceARN)
}
var tasks []*ecs.Task
var n, m int
for {
n = m
m = m + 100
if m > len(taskARNs) {
m = len(taskARNs)
}
inputDesc := &ecs.DescribeTasksInput{
Cluster: aws.String(clusterName),
Tasks: taskARNs[n:m],
}
outputDesc, err := a.Service.ECS.DescribeTasks(inputDesc)
if err != nil {
return nil, errors.Annotatef(err, "min %d, max %d", n, m)
}
tasks = append(tasks, outputDesc.Tasks...)
if m >= len(taskARNs) {
break
}
}
return tasks, nil
}
func (a *App) GetRunningTasks(clusterName, ec2InstanceID string) ([]*ecs.Task, error) {
if clusterName == "" || ec2InstanceID == "" {
return nil, errors.Errorf("cluster_name %s and/or ec2_instance_id %s is empty", clusterName, ec2InstanceID)
}
containerInstance, err := a.getContainerInstance(clusterName, ec2InstanceID)
if err != nil {
return nil, errors.Trace(err)
}
containerInstanceARN := aws.StringValue(containerInstance.ContainerInstanceArn)
tasks, err := a.getTasks(clusterName, containerInstanceARN, TaskStatusRunning)
if err != nil {
if errors.IsNotFound(errors.Cause(err)) {
return nil, nil
}
return nil, errors.Trace(err)
}
return tasks, nil
}
func (a *App) getContainerInstance(clusterName, ec2InstanceID string) (*ecs.ContainerInstance, error) {
inputList := &ecs.ListContainerInstancesInput{
Cluster: aws.String(clusterName),
MaxResults: aws.Int64(100),
}
var containerARNs []*string
for {
outputList, err := a.Service.ECS.ListContainerInstances(inputList)
if err != nil {
return nil, errors.Annotatef(err, "clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
}
containerARNs = append(containerARNs, outputList.ContainerInstanceArns...)
if outputList.NextToken == nil {
break
}
inputList = inputList.SetNextToken(*outputList.NextToken)
}
log := a.Log.With(zap.String("cluster_name", clusterName), zap.String("ec2_instance_id", ec2InstanceID))
log.Debug("list_container_instance", zap.Strings("container_instance_arns", aws.StringValueSlice(containerARNs)))
if len(containerARNs) == 0 {
return nil, errors.NotFoundf("clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
}
inputDisc := &ecs.DescribeContainerInstancesInput{
Cluster: aws.String(clusterName),
ContainerInstances: containerARNs,
}
outputDisc, err := a.Service.ECS.DescribeContainerInstances(inputDisc)
if err != nil {
return nil, errors.Annotatef(err, "clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
}
target := func(outputDisc *ecs.DescribeContainerInstancesOutput) *ecs.ContainerInstance {
for _, conIns := range outputDisc.ContainerInstances {
if strings.Compare(ec2InstanceID, aws.StringValue(conIns.Ec2InstanceId)) == 0 {
return conIns
}
}
return nil
}(outputDisc)
if target == nil {
return nil, errors.Errorf("failed to fetch container instances from cluster %q, ec2_instance_id %q", clusterName, ec2InstanceID)
}
return target, nil
}
func (a *App) ChangeInstanceStatus(clusterName, containerInstanceARN, status string) error {
if clusterName == "" || containerInstanceARN == "" {
return errors.Errorf("clusterName %s and/or containerInstanceARN %s is empty", clusterName, containerInstanceARN)
}
input := &ecs.UpdateContainerInstancesStateInput{
Cluster: aws.String(clusterName),
ContainerInstances: aws.StringSlice([]string{containerInstanceARN}),
Status: aws.String(status),
}
output, err := a.Service.ECS.UpdateContainerInstancesState(input)
if err != nil {
return errors.Annotatef(err, "clusterName %q, containerInstanceARN %q, status %s", clusterName, containerInstanceARN, status)
}
failures := output.Failures
if len(failures) > 0 {
buf := new(bytes.Buffer)
for _, f := range output.Failures {
buf.WriteString(fmt.Sprintf("%s %s\n", aws.StringValue(f.Arn), aws.StringValue(f.Reason)))
}
return errors.Errorf("clusterName %q, containerInstanceARN %q, status %s\nreported failures %s",
clusterName, containerInstanceARN, status, strings.TrimRight(buf.String(), "\n"))
}
return nil
}
func (a *App) EnsureInstanceStatusDraining(clusterName, ec2InstanceID string) error {
if clusterName == "" || ec2InstanceID == "" {
return errors.Errorf("cluster_name %s and/or ec2_instance_id %s is empty", clusterName, ec2InstanceID)
}
containerInstance, err := a.getContainerInstance(clusterName, ec2InstanceID)
if err != nil {
return errors.Trace(err)
}
containerInstanceARN := aws.StringValue(containerInstance.ContainerInstanceArn)
status := aws.StringValue(containerInstance.Status)
log := a.Log.With(
zap.String("cluster_name", clusterName),
zap.String("ec2_instance_id", ec2InstanceID),
zap.String("container_instance_arn", containerInstanceARN),
zap.String("container_status", status))
switch status {
case ConInsStatusDraining:
log.Info("ensure_container_status", zap.String("msg", "status already draining"))
case ConInsStatusActive, ConInsStatusInactive:
log.Info("ensure_container_status", zap.String("msg", "change container status to draining"))
err := a.ChangeInstanceStatus(clusterName, containerInstanceARN, ConInsStatusDraining)
if err != nil {
return errors.Trace(err)
}
return a.EnsureInstanceStatusDraining(clusterName, ec2InstanceID)
default:
return errors.Errorf("unexpected container instance status %s", status)
}
return nil
}
var _clusterNameRegexp = regexp.MustCompile(`^[^#]*ECS_CLUSTER=(?P<name>[${}\w]+) ?.*$`)
func matchClusterName(line string) (clusterName string, ok bool) {
m := _clusterNameRegexp.FindStringSubmatch(line)
if len(m) == 2 {
clusterName, ok = m[1], true
}
return
}
func clusterNameFromECSConfig(ecsConfig []byte) (clusterName string, err error) {
s := bufio.NewScanner(bytes.NewReader(ecsConfig))
for s.Scan() {
clusterName, found := matchClusterName(s.Text())
if found {
return clusterName, nil
}
}
return "", s.Err()
}
func clusterNameFromUserData(encoded string) (clusterName string, err error) {
if encoded == "" {
return "", errors.New("encoded userdata is empty")
}
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return "", errors.Annotatef(err, "encoded userdata: %s", encoded)
}
return clusterNameFromECSConfig(decoded)
}
func (a *App) fetchClusterNameFromUserData(ec2InstanceID string) (clusterName string, err error) {
input := &ec2.DescribeInstanceAttributeInput{
Attribute: aws.String("userData"),
InstanceId: aws.String(ec2InstanceID),
}
output, err := a.Service.EC2.DescribeInstanceAttribute(input)
if err != nil {
return "", errors.Annotatef(err, "ec2 instance id %s", ec2InstanceID)
}
if output.UserData == nil {
return "", errors.Errorf("DescribeInstanceAttributeOuteput.UserData is nil")
}
encodedUserData := aws.StringValue(output.UserData.Value)
clusterName, err = clusterNameFromUserData(encodedUserData)
if err != nil {
return "", errors.Trace(err)
}
log := a.Log.With(zap.String("ec2_instance_id", ec2InstanceID))
log.Info("fetch_cluster_name", zap.String("value", clusterName))
if strings.Contains(clusterName, "$") {
log.Info("fetch_cluster_name",
zap.String("msg", "discard invalid cluster name"),
zap.String("cluster_name", clusterName))
clusterName = ""
}
return
}
func tagValue(tags []*ec2.Tag, key string) string {
for _, tag := range tags {
if strings.Compare(key, aws.StringValue(tag.Key)) == 0 {
return aws.StringValue(tag.Value)
}
}
return ""
}
func (a *App) fetchClusterNameFromTag(ec2InstanceID string) (clusterName string, err error) {
input := &ec2.DescribeInstancesInput{
InstanceIds: aws.StringSlice([]string{ec2InstanceID}),
}
output, err := a.Service.EC2.DescribeInstances(input)
if err != nil {
return "", errors.Trace(err)
}
target := func(output *ec2.DescribeInstancesOutput) *ec2.Instance {
for _, rsv := range output.Reservations {
for _, ins := range rsv.Instances {
if strings.Compare(ec2InstanceID, aws.StringValue(ins.InstanceId)) == 0 {
return ins
}
}
}
return nil
}(output)
if target == nil {
return "", errors.Annotatef(err, "failed to fine ec2 instance %s", ec2InstanceID)
}
return tagValue(target.Tags, a.Options.EC2InstanceTagKeyClusterName), nil
}
func (a *App) FetchClusterName(ec2InstanceID string) (clusterName string, err error) {
ec2InstanceID = strings.TrimSpace(ec2InstanceID)
if ec2InstanceID == "" {
return "", errors.New("FetchClusterName: ec2InstanceID is empty")
}
clusterName, err = a.fetchClusterNameFromUserData(ec2InstanceID)
if err != nil {
return "", errors.Trace(err)
}
if clusterName != "" {
return clusterName, nil
}
clusterName, err = a.fetchClusterNameFromTag(ec2InstanceID)
return clusterName, errors.Trace(err)
}
type LifecycleEvent struct {
LifecycleHookName string `json:"LifecycleHookName"`
AccountID string `json:"AccountId"`
RequestID string `json:"RequestId"`
LifecycleTransition string `json:"LifecycleTransition"`
AutoScalingGroupName string `json:"AutoScalingGroupName"`
Service string `json:"Service"`
TimeRaw string `json:"Time"`
EC2InstanceID string `json:"EC2InstanceId"`
NotificationMetadataRaw string `json:"NotificationMetadata"`
LifecycleActionToken string `json:"LifecycleActionToken"`
NotificationMetadata map[string]interface{}
}
func NewLifecycleEventFromJSON(raw string) (LifecycleEvent, error) {
var event LifecycleEvent
if raw == "" {
return event, errors.New("NewLifecycleEventFromJSON: empty json")
}
err := json.Unmarshal([]byte(raw), &event)
if err != nil {
return event, errors.Errorf("NewLifecycleEventFromJSON: json unmarshal error: %s", err)
}
meta := event.NotificationMetadataRaw
if meta != "" {
event.NotificationMetadata = make(map[string]interface{})
if err := json.Unmarshal([]byte(meta), &event.NotificationMetadata); err != nil {
}
}
return event, nil
}
type EventValidator func(LifecycleEvent) error
func serviceIsAutoScaling(e LifecycleEvent) error {
const want = "AWS Auto Scaling"
if e.Service != want {
return errors.Errorf("LifecycleEvent.Service should be %q, but %q", want, e.Service)
}
return nil
}
func transitionIsTerminating(e LifecycleEvent) error {
const want = "autoscaling:EC2_INSTANCE_TERMINATING"
if e.LifecycleTransition != want {
return errors.Errorf("LifecycleEvent.LifecycleTransition should be %q, but %q", want, e.LifecycleTransition)
}
return nil
}
func ValidateEvent(event LifecycleEvent, fns ...EventValidator) error {
for _, f := range fns {
if err := f(event); err != nil {
return errors.Trace(err)
}
}
return nil
}
func (a *App) HandleRecord(ctx context.Context, record events.SNSEventRecord) error {
lcEvent, err := NewLifecycleEventFromJSON(record.SNS.Message)
if err != nil {
return errors.Trace(err)
}
a.Log.Debug("parse_lifecycle_event", zap.String("dump", spew.Sdump(lcEvent)))
err = ValidateEvent(lcEvent,
serviceIsAutoScaling,
transitionIsTerminating)
if err != nil {
return errors.Annotatef(err, "failed to validate lifecycle event")
}
clusterName, err := a.FetchClusterName(lcEvent.EC2InstanceID)
if err != nil {
return errors.Annotatef(err, "failed to fetch cluster name from ec2 instance %s", lcEvent.EC2InstanceID)
}
if clusterName == "" {
return errors.Errorf("failed to fetch cluster name from ec2 instance %s", lcEvent.EC2InstanceID)
}
log := a.Log.With(
zap.String("cluster_name", clusterName),
zap.String("ec2_instance_id", lcEvent.EC2InstanceID),
)
log.Info("fetch_cluster_name")
err = a.EnsureInstanceStatusDraining(clusterName, lcEvent.EC2InstanceID)
if err != nil {
errors.Annotatef(err, "ec2 instance: %s", lcEvent.EC2InstanceID)
}
runningTasks, err := a.GetRunningTasks(clusterName, lcEvent.EC2InstanceID)
if err != nil {
return errors.Trace(err)
}
log.Info("fetch_running_tasks", zap.Int("running_tasks_num", len(runningTasks)))
if len(runningTasks) == 0 {
log.Info("complete_asg_lifecycle_action",
zap.String("lifecycle_hook_name", lcEvent.LifecycleHookName),
zap.String("autoscaling_group_name", lcEvent.AutoScalingGroupName))
return a.CompleteASGLifecycleAction(lcEvent)
}
log.Info("publish_sns",
zap.String("topic_arn", record.SNS.TopicArn),
zap.String("message", record.SNS.Message))
return a.PublishSNS(ctx, record.SNS.TopicArn, record.SNS.Message)
}
func (a *App) HandleEvent(ctx context.Context, event events.SNSEvent) error {
a.Log.Info("start")
for _, record := range event.Records {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := a.HandleRecord(ctx, record); err != nil {
a.Log.Error("failed", zap.String("details", errors.Details(err)))
return err
}
}
}
a.Log.Info("success!")
return nil
}
func main() {
app := NewApp()
lambda.Start(app.HandleEvent)
}
Goのlambdaはbuildしたbinaryをzip化してS3に設置するだけなのでシンプルです。terraform側ではbucket/keyとversionIDを指定しています。
GOOS=linux GOARCH=amd64 go build -o _build/handler .
(cd _build && zip handler.zip handler)
aws s3 mv _build/handler.zip s3://backet/prefix/asg-terminate-hook/handler.zip
aws s3api head-object --bucket bucket --key prefix/asg-terminate-hook/handler.zip --query 'VersionId' --output=text
コンテナは動的に増減していくので、API作成時は後述する12factor app 9の 廃棄容易性を意識してすばやくshutdownできる必要があります。
GoでのAPI Server実装
利用したpackage
主に以下のpackageを利用しました
gin
http関連のhandlingはginを利用しました、requestをparseしてjsonを返すsimpleなAPIなので特に問題なく利用できまし。URLのroutingにはhttprouter(https://github.com/julienschmidt/httprouter)が利用されているので,URLが特殊な場合は事前に対応できるか確認が必要です。validate処理にはvalidator(https://github.com/go-playground/validator)を利用してvalidate処理を行いました
gorm
DB処理にはgorm(https://github.com/jinzhu/gorm)を利用しました。ORMというよりはSQL Builder的に利用しました。document(http://gorm.io/docs/)が充実しており、とても利用しやすかったです。
subqueryが必要な場合は明示的にsubqueryを生成する必要があるので、裏側でとんでもないSQLが生成されるといったこともなく、過度な抽象化は行わない方針で進めました。
zap
個人的にはGoのloggingについて、標準 packageの log
以外に決定版が存在していないと思っており,皆様がloggingをどのように行っているか気になっているところです。
今回は構造化されたloggingを行いたかったのでuber社が公開しているzap(https://github.com/uber-go/zap)を利用しました。
package layout
今回の一番の悩みどころがpackageのlayout(directory構成)でした。
Goの場合Webframeworkといった形で構成から命名規則まで統一的に管理するより各component単位でpackageを組み合わせ、できるだけ小さく保っていくアプローチがマッチしていると感じています。そこで、Clean Architectureの考えにのっとり,domain層(業務logic),httpに関連するcontroller層,データのCRUDを担当するrepository層の分離を意識したpackage構成にしました。自分自身、clean architectureの概念を勉強中で Clean Architecture: A Craftmans\'s Guide to Software Structure and Design(https://www.amazon.co.jp/dp/B075LRM681)や各種blog等を読ませていただき悪戦苦闘している状態です。
個人的には以下の点を満たせていればよいのではと思っております
- 業務logicを表現するlayerは他のlayerに依存せず,必要なサービスや機能はinterfaceで表現
- interfaceを実装するうえで共通のリソース(DB,検索engin,cache...)に依存する処理をpackageに落とし込む
- http関連の概念は担当layerでまとめて、layer間はできるだけprimitiveなデータとinterfaceだけの依存関係にする
12 factor app
Dockerを用いて開発したこともあり、12 factor app(https://12factor.net/)に適合するよう意識しました. 12 factor appとGoについては12 Factor Applications
with Docker and Go(https://www.amazon.co.jp/12-Factor-Applications-Docker-English-ebook/dp/B075HWVLMC)が参考になりました。
コードベース
githubを利用しているので、ここは特に意識しませんでした。
依存関係
Goのpackage管理にはdepを利用しました。
設定
設定は設定fileを用意することはせず、すべて環境変数から渡すようにしました。そのため設定値を管理するためにviper(https://github.com/spf13/viper)を利用したりはせず、直接環境変数を読んで構造体にbindするシンプルなものにしました。認証情報はAWS SecretsManager上に保持するようにしました。
バックエンドサービス
前述しました、clean architectureにそっていれば自然と外部サービスへの依存はinterfaceとして切り出されているはずなので、特に意識することなく環境変数を通じた切り替えができました。
ビルド, リリース, 実行
この箇所は自分の理解が曖昧なのですが、リリースを一意に識別し、ロールバックする点については、ECRを通じたimage管理で実現できていると思っています。
プロセス
単純なAPI Serverだったので特に意識せずにstatelessかつshared-nothingが実現できたのではないでしょうか。
ポートバインディング
Goを利用していれば net/http
packageのおかげて自然とport bindingまで完結されるで特に意識しませんでした。
並行性
理解が曖昧ですが、share-nothingが満足されていれば、自然と満たせると考えております。
この考えを推し進めるとあまり多くのgoroutineを生成すべきでないという結論にいきつくのでしょうか。
廃棄容易性
廃棄容易性と仰々しいですが、signalをhandlingしてhttp serverをshutdownする処理を加えました。
ginのsampleを参考にしました(https://github.com/gin-gonic/gin/blob/master/examples/graceful-shutdown/graceful-shutdown/server.go)
autoscalingの中でappの内容をできるだけ意識したくないので、起動/終了処理が数秒で完了することは運用面からとても重要な性質だと実感しています。
非同期で処理するジョブを抱え込んだりして、すぐに終了できなくなった場合は黄色信号で、Queue等を通じて処理を分散させる必要があります。
ログ
12factor appを読む前はログといえばログファイルに書き込んでローテションさせるものと思っており、app側が出力ストリームより向こう側を意識してはいけないという主張は極端だなと感じていました。ところが実際にapp側はSTDOUTに書き込むだけにすることで、logging関連処理をappから切り離し、ローテション処理やストレージ管理といった依存性もなくすことができました。
ログ管理については後述します。
管理プロセス
この考えは徹底できておらず、今後の改善が望まれる箇所です。
定期実行が必要な処理はlambda等に切り出し、DBや外部サービスの管理は別途オーケストレーションツール(chef,ansible)で行っています。
Log
logging処理の流れは, App -> fluentd -> bigquery
というシンプルな構成をとりました。
app
loggingに関しては12 factor ppの箇所でも述べた通り、app側ではSTDOUTへの書き込み以降は意識しないようになっており、出力先はtask definition(docker-componse.yml)で制御しています。
以下のようにlogging.dirverとしてfluentdを指定しています。logging driverにfluentdがサポートされているので、appの中ではfluentdに書き出していることも意識しないことができました.
version: '2'
services:
api:
image: ${IMAGE_TAG_API}
mem_limit: ${mem_limit}
ports:
- "0:80"
environment:
- APP_ENV=${APP_ENV}
command: /usr/local/bin/api-runner
logging:
driver: fluentd
options:
fluentd-address: ${FLUENTD_ADDRESS}
tag: apiv2.{{.Name}}.{{.ID}}
fluentd
fluentdについてもAPI Server同様、ECSで管理しています。
logがどのように利用されるかはlogging処理作成時点ではわかっていない/運用次第で変化しうる箇所なので、いったんaggreageする層が必要だと考えています。
fluentd imageについては公式を参考に以下のようなDockerfileになりました
FROM fluent/fluentd:v1.1.1-debian-onbuild
# install plugins
RUN buildDeps="sudo make gcc g++ libc-dev ruby-dev" \
&& apt-get update \
&& apt-get install -y --no-install-recommends $buildDeps \
&& sudo gem install \
fluent-plugin-bigquery:1.2.0 \
&& sudo gem sources --clear-all \
&& SUDO_FORCE_REMOVE=yes \
apt-get purge -y --auto-remove \
-o APT::AutoRemove::RecommendsImportant=false \
$buildDeps \
&& rm -rf /var/lib/apt/lists/* \
/home/fluent/.gem/ruby/2.3.0/cache/*.gem
# ONBUILDでcurrentのfluent.confは/fluentd/etc/fluent.confにCOPYされる
RUN mkdir -p /fluentd/etc/files
ADD files /fluentd/etc/files/
# -v -> debug
# -vv -> trace
ENV FLUENTD_OPT ""
CMD exec fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins $FLUENTD_OPT
fluentdがAPI Serverの構成と異なる点は, ALB ListenerのprotocolをTCPにする関係で動的ポートの設定ができず、1Host-1Taskなってしまっている点です。この点は今後の改善点ですが現状ではfluend側の機能で吸収しきれています。また、fluentd側のlogDriverにはawslogsを設定し、そのままforwardすることで、Cloudwatchからlogを確認できるようにしています.
[
{
"name": "fluentd",
"image": "${image_url}",
"cpu": 0,
"memory": ${memory},
"portMappings": [
{
"hostPort": 24224,
"protocol": "tcp",
"containerPort": 24224
}
],
"logCOnfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-region": "${region}",
"awslogs-group": "{cw_logs_group}",
"awslogs-stream-prefix": "${cw_stream_prefix}"
}
},
"essential": true
}
]
bigquery
GCP Serviceのひとつであるbigqueryがlogの最終的な格納先です。
bigquery自体は既存システムが利用していた関係で採用されていますが、logを構造化して出力したい理由のひとつでした。
というのも、bigqueryは事前に定義されたschemaをもつので、loggingもアドホックにせず決められたkeyを出力する必要があったからです。
bigqueryへの入力はfluentdのfilter pluginを用いて、jsonをparseし、bigquery plugin(https://github.com/kaizenplatform/fluent-plugin-bigquery)を利用してinsertしました。logの分析がbigqueryを通じて行えるので,
SQLが使えるのがとてもありがたいです。
SELECT *
FROM <log_table>
WHERE
status_code != 200
AND
path = "/version/endpoint"
AND
timestamp BETWEEN <FROM> AND <TO>
SELECT path, AVG(latency) AS latency_avg
FROM <log_table>
GROUP BY path
運用
監視
外形監視
外形監視についてはMackerel(https://mackerel.io/) を利用しております。
Mackerelはsimpleで使いやすくUIも洗練されていて,とてもすばらしいサービスだと思います。(もちろん(CLI)https://mackerel.io/ja/docs/entry/advanced/cli)ただし、リクエスト間隔は1分で固定なので注意が必要です。
EC2 InstanceのMetricはECSを通じて収集しているので、mackerel-agent(https://github.com/mackerelio/mackerel-agent)は利用していないのですが、Go製でServerのMetricとはどうやって収集するのか興味があるので、ソースを読んで勉強させてもらいたいと思っています。
Metric監視
CloudWatch AlarmをECSの各種Metricに設定し,SNS経由でpagerdutyに集約するようにしています。
ecs-agent監視
ecs-agentについてはECS API DescribeContainerInstances
からAgentConnectedを取得し、接続を確認できない場合はSNSに通知するLambdaをCloudWatch
Eventで定期実行しています。
fluentd監視
fluentdの監視についてはCloudWatch LogGroupを作成し、log metric filterを作成し、シンプルにlog中のerrorを拾ってMetricに変換しています。bigqueryのinsertに関してはfluentdのlogGroupに対してlog subscriptionを設定し、lambdaを呼び出しています。lambdaの中でfluentdのlogをparseし、CloudWatchのPutMetrics APIを呼び出しています。
resource "aws_cloudwatch_log_subscription_filter" "watch_fluentd" {
name = "fluentd_status_logfilter"
log_group_name = "${aws_cloudwatch_log_group.log.name}"
filter_pattern = "${var.fluentd_subscription_filter_pattern}"
destination_arn = "${aws_lambda_function.watch_fluentd.arn}"
distribution = "ByLogStream"
depends_on = ["aws_lambda_permission.allow_cw_invoke_lambda"]
}
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
ac "github.com/howtv/go-awsctr"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/juju/errors"
"github.com/koron/go-dproxy"
"go.uber.org/zap"
)
type App struct {
Service *Service
Options AppOptions
Log *zap.Logger
}
type AppOptions struct {
LogLevel int
LogEncode string
}
func newAppOptionsFromEnv() AppOptions {
logLevel, err := strconv.ParseInt(os.Getenv("LAMBDA_LOG_LEVEL"), 10, 0)
if err != nil {
logLevel = 0
}
logEncode := os.Getenv("LAMBDA_LOG_ENCODE")
if logEncode == "" {
logEncode = "console"
}
return AppOptions{
LogLevel: int(logLevel),
LogEncode: logEncode,
}
}
func NewApp() *App {
app, err := newApp(newAppOptionsFromEnv())
if err != nil {
fmt.Println("failed to init app", err.Error())
os.Exit(1)
}
return app
}
func newApp(o AppOptions) (*App, error) {
logger, err := GetLogger(
WithLoggingLevel(o.LogLevel),
WithEncoded(o.LogEncode),
)
if err != nil {
return nil, err
}
service, err := NewService(o)
if err != nil {
return nil, err
}
return &App{
Service: service,
Log: logger,
Options: o,
}, nil
}
type Service struct {
Aws *awsResource
}
func NewService(o AppOptions) (*Service, error) {
awsRsc := newAWSResource()
return &Service{
Aws: awsRsc,
}, nil
}
type awsResource struct {
cw ac.CloudWatch
}
func newAWSResource() *awsResource {
sess := ac.NewSession("ap-northeast-1")
return &awsResource{
cw: ac.NewCloudWatch(sess),
}
}
type FluentStatus struct {
BufferQueueLength string
BufferTotalQueuedSize string
RetryCount string
}
func ParseMessage(msg string) ([]ac.CountMetricsInfo, error) {
msgs := strings.SplitN(msg, " ", 5)
m := strings.Join(msgs[4:], "")
var fluentStatus interface{}
err := json.Unmarshal([]byte(m), &fluentStatus)
if err != nil {
return nil, err
}
f := dproxy.New(fluentStatus)
pluginIDValue, _ := f.M("plugin_id").String()
bufferQueueLength, _ := f.M("buffer_queue_length").Float64()
bufferTotalQueueSize, _ := f.M("buffer_total_queued_size").Float64()
retryCount, _ := f.M("retry_count").Float64()
var info []ac.CountMetricsInfo
info = []ac.CountMetricsInfo{
ac.CountMetricsInfo{
NameSpace: "fluentd",
DimensionName: "PluginId",
DimensionValue: pluginIDValue,
MetricName: "buffer_queue_length",
Value: bufferQueueLength,
},
ac.CountMetricsInfo{
NameSpace: "fluentd",
DimensionName: "PluginId",
DimensionValue: pluginIDValue,
MetricName: "buffer_total_queued_size",
Value: bufferTotalQueueSize,
},
ac.CountMetricsInfo{
NameSpace: "fluentd",
DimensionName: "PluginId",
DimensionValue: pluginIDValue,
MetricName: "retry_count",
Value: retryCount,
},
}
return info, nil
}
func (a *App) HandleLogEvent(e events.CloudwatchLogsLogEvent) error {
a.Log.Debug("handle_log_event",
zap.String("id", e.ID),
zap.Int64("timestamp", e.Timestamp),
zap.String("message", e.Message),
)
metrics, err := ParseMessage(e.Message)
if err != nil {
return errors.Annotatef(err, "failed to parse event message %s", e.Message)
}
for _, m := range metrics {
err = a.Service.Aws.cw.PutCountMetrics(m)
if err != nil {
return errors.Annotatef(err, "failed to put metrics %#v", m)
}
}
a.Log.Info("put_metrics", zap.String("msg", "put log message successfully"))
return nil
}
func (a *App) parseRawData(raw events.CloudwatchLogsRawData) (events.CloudwatchLogsData, error) {
return raw.Parse()
}
func (a *App) HandleEvent(ctx context.Context, event events.CloudwatchLogsEvent) error {
log.Println("start")
data, err := a.parseRawData(event.AWSLogs)
if err != nil {
return errors.Annotatef(err, "failed to parse event raw data")
}
a.Log.Info("handle_event",
zap.String("log_group", data.LogGroup),
zap.String("log_stream", data.LogStream),
zap.Strings("subscription_filters", data.SubscriptionFilters),
zap.String("message_type", data.MessageType),
zap.Int("log_events_num", len(data.LogEvents)))
for _, logEvent := range data.LogEvents {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := a.HandleLogEvent(logEvent); err != nil {
a.Log.Error("failed", zap.String("details", errors.Details(err)))
return err
}
}
}
log.Println("success")
return nil
}
func main() {
app := NewApp()
lambda.Start(app.HandleEvent)
}
まとめ
- terraformはcloudformationより使いやすかった
- ECSはlaunch typeをEC2にする限り、ClusterとService2つのautoscalingの設定が必要
- Clusterをscale inする際は自前でgracefulな処理を作らないといけない
- 12 factor appに従うようにしておくと、アプリをコンテナ化して運用しやすい
- Goは楽しい
お知らせ
ハウテレビジョンでは、エンジニアを募集しています。AWS,Go,Dockerで一緒にシステムを作っていきませんか。ご連絡はこちら