ハウテレビジョン開発者ブログ

『外資就活ドットコム』を日夜開発している技術陣がプログラミングネタ・業務改善ネタ・よしなしごとについて記していきます。

iOSアプリのバックエンドをAWS ECSとGoで作りました

はじめまして、Goのpackage import pathのためにgithubアカウントを短くしたymgytです。

 ハウテレビジョンでは2018年5月15日にiOSアプリ外資就活ドットコムのリニューアル版をリリースいたしました。 このアプリのバックエンドをAWS Elastic Container Service(ESC)とGoを中心に作りました。本blogではその際にハマったところや意識したところを述べていきたいと思います。

構成

f:id:yamaguchi7073xtt:20180709175852p:plain

主に利用しているAWS関連のリソースは

  • ALB(Application Load Balancer)
  • Autoscaling Group(EC2)
  • ECS
  • Lambda
  • CloudWatch(Logs, Alarm, Events)
  • RDS
  • Elasticache(redis)

Terraform

 AWSリソースの作成はterraformを利用しました。  terraformはvagrant,packer等を提供しているHashiCorpが作成したGo製のCLIツールです。

Cloudformationとの比較

 私はAWSのリソース管理にはCloudformationを利用しておりましたが、今Projectではterraformを用いて行うことになっており、terraform初挑戦でした. Cloudformationを利用していた時は、AWS CLIやAWS SDKを用いてCloudformation APIを叩くことになりますが、この際の適用scriptは自分でつくる必要があります。丁寧にやろうとするといきなりCloudformationのstackを更新せずに、Change SetというAWS リソースへの変更を表現したデータを一度作成して、内容を確認した後にこれを適用する必要があります。また、各リソースのパラメータ毎に更新された場合にリソースが破棄=>再作成されるのか、動的に更新されるのかが決まっているのですが、このあたりを意識せずにterraform plan を実行すればよいだけなのがすごく良かったです。  どんな設定fileでもそうですが、再利用性を考えた場合や後の変更箇所は変数化されていていき、結果変数の数が増えていきます。Cloudformationでは1 templateで利用できるパラメータ数が60という制限があり、適宜Mappingやroot templateを設けてtemplateを細分化して対応したりして対応していました。terraformでは variable.tf fileを作成して変数を宣言し, terraform.tfvarsfileを作成して変数を明示的に管理できる仕組みが用意されており、templateと変数の分離/管理が行いやすかったです。ただし、terraformはAWSに特化したツールではないので、Cloudformationで利用できる組み込み関数(GetAZs)やConditionといったものは利用できません。また各リソースのパラメータについての説明はterraform上のdocumentは説明が簡素なので、場合によってはCloudformationのdocumentを参照する必要がでてきます。

Go製CLIツール

 最後は個人的な好みになりますが、terraformがGoで書かれているので+5億点でした。terraformの中では、HashiCorp創業者であるMitchell Hashimoto氏が作成された github.com/mitchellh/cligithub.com/mitchellh/panicwrap 等が利用されており、読んでいてとても勉強になります。

ECS

 先日のAWS Summit Tokyo 2018でap-northeast-1 regionでのAWS Fargateの対応予定がアナウンスされましたが構築中の時期ではFargateはap-northeast-1では未対応でしたので、通常のEC2で作成しました。

Cluster

 ECSではEC2 Instanceの集合をClusterという概念で捉え、CPU使用率やメモリ使用率といったリソースの使用状況もCluster単位で把握できる仕組みを提供してくれています。ただし、すべてをCluster単位で考えて、それぞれのEC2 Instanceのことは考えなくてよいかというと後述するポートの関係等でEC2 Instanceを意識せざるをえない場面もあります。最初にわかりづらかった点として、EC2 Instanceを作成するに際して、当該InstanceとECS上のなにかを対応づける情報が一切ない点です.

Cluster作成のtf fileのsample
   resource "aws_launch_configuration" "ecs" {
     name_prefix                 = "ecs-${var.app_name}-"
     image_id                    = "${var.image_id}"
     instance_type               = "${var.instance_type}"
     iam_instance_profile        = "${aws_iam_instance_profile.ecs.id}"
     key_name                    = "${var.key_name}"
     security_groups             = ["${aws_security_group.ecs_node_sg.id}"]
     user_data                   = "${data.template_file.ecs_config.rendered}"
     associate_public_ip_address = false
   
     lifecycle {
       create_before_destroy = true 
     }
   }
   
   resource "aws_autoscaling_group" "ecs" {
     vpc_zone_identifier       = ["${data.aws_subnet.public_L.id}", "${data.aws_subnet.public_R.id}"]
     name                      = "${var.app_name}-group-${var.env_name}"
     min_size                  = "${var.asg_min_size}"
     max_size                  = "${var.asg_max_sieze}"
     health_check_grace_period = 300
     health_check_type         = "ELB"
     desired_capacity          = "${var.ec2_desired_capacity}"
     launch_configuration      = "${aws_launch_configuration.ecs.name}"
   
     lifecycle {
       create_before_destroy = true
     }
   
     tag {
       key                 = "Name"
       value               = "${var.app_name}-node-${var.env_name}"
       propagate_at_launch = true
     }
   
     tag {
       key                 = "ClusterName"
       value               = "${aws_ecs_cluster.api.name}"
       propagate_at_launch = true
     }
   }
   
   resource "aws_iam_instance_profile" "ecs" {
     name = "ecs-instance-profile-${var.app_name}-${var.env_name}"
     path = "/"
     role = "${aws_iam_role.ecs_instance_role.name}"
   }
   
   resource "aws_iam_role" "ecs_instance_role" {
     name = "ecs_instance_role-${var.app_name}-${var.env_name}"
   
     assume_role_policy = <<EOF
   {
     "Version": "2008-10-17",
     "Statement": [
       {
         "Action": "sts:AssumeRole",
         "Principal": {
           "Service": "ec2.amazonaws.com"
         },
         "Effect": "Allow",
         "Sid": ""
       }
     ]
   }
   EOF
   }
   
   resource "aws_iam_policy" "ecs_instance_policy" {
     name = "ecs_instance_policy-${var.app_name}-${var.env_name}"
   
     policy = <<EOF
   {
     "Version": "2012-10-17",
     "Statement": [
       {
         "Action": [
           "ec2:Describe*"
         ],
         "Effect": "Allow",
         "Resource": "*"
       }
     ]
   }
   EOF
   }
   
   resource "aws_iam_role_policy_attachment" "ecs_instance_role_attach_manage" {
     role       = "${aws_iam_role.ecs_instance_role.name}"
     policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
   }
   
   resource "aws_iam_role_policy_attachment" "ecs_instance_role_attach_custom" {
     role       = "${aws_iam_role.ecs_instance_role.name}"
     policy_arn = "${aws_iam_policy.ecs_instance_policy.arn}"
   }

tagやIAM Roleにecsを意識した設定はでてくるものの、Clusterの作成は通常のautoscale groupを利用したEC2 Instanceの作成とかわりません。 autoscale groupをclusterにしているのが、user_dataとして渡している起動scriptでおこなわれるecs agent設定です。

user_data.sh
#!/bin/bash

# Install JQ JSON parser
yum install -y jq aws-cli

# Get the current region and instance_id from the instance metadata
region=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)
instance_id=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)

# Fetch ECS cluster name from tag
cluster_name=$(aws ec2 describe-instances --region $region --instance-ids $instance_id | jq '.Reservations[0].Instances[0].Tags | from_entries | .ClusterName' -r)

echo ECS_CLUSTER=$cluster_name >> /etc/ecs/ecs.config
echo ECS_AVAILABLE_LOGGING_DRIVERS=[\"json-file\",\"syslog\",\"fluentd\",\"awslogs\"] >> /etc/ecs/ecs.config
echo ECS_UPDATES_ENABLED=true >> /etc/ecs/ecs.config
echo ECS_ENGINE_TASK_CLEANUP_WAIT_DURATION=1h >> /etc/ecs/ecs.config
echo ECS_CONTAINER_STOP_TIMEOUT=5m >> /etc/ecs/ecs.config

AMIはECS-Optimized AMIを利用しているので、ecs-agentはpreinstallされています.ecs agentは /etc/ecs/ecs.configに従って機能するので、起動時に必要なパラメータを設定します。この処理のおかげてECS上のCluterという単位でEC2 Instance群を捉えられるようになります。 FargateとEC2 Instanceを抽象化するためか、ECS上ではEC2 InstanceはECS Container InstanceとしてAPIのパラメータやdocument上で扱われます。

Service

 Clusterを定義した後は、Serviceを作成します・ Serviceは最初はわかりづらい概念ですが、私はコンテナ群とロードバランサーを紐付ける概念上のモデルと捉えています。ServiceのパラメータでlaunchType(FARGATE,EC2)を設定したり、taskDefinition(docker-componse.yml)やコンテナ数(task数)を設定します。

Deploy

ECSにおけるDeployは、Serviceのtask definitionの更新なのでdeploy自体は下記のコマンドで実行しています

elb_target_group_arn=$(aws elbv2 describe-target-groups --names ${ELB_TARGET_NAME} | jq -r '.TargetGroups[].TargetGroupArn')
ecs-cli compose --project-name ${PROJECT_NAME} --task-role-arn ecs_task_role-${PROJECT_NAME} service up --container-name api --container-port 80 --target-group-arn ${elb_target_group_arn}

ecs-cliがdocker-compose.yml fileをtaskDefinitionとして扱ってくれるのでdocker imageを更新したうえでecs-cli service up を叩くとdeployが走ります。 ports: 0:80は動的ポートを設定しており、コンテナをスタートするとホスト上で動的に確保されたポートをALBのtarget groupに登録してくれます。この設定によって、1台のEC2 Instanceに複数のtaskを配置できEC2 Instanceのリソースを有効に使い切れます。 Client -> ALB(443) -> EC2 Instance(32777) -> Docker Container(80)というイメージです。 ecs-cliはaws-cliとは別にawsによって管理されており、こちらもGo製です。

version: '2'
services:
  api:
    image: ${IMAGE_TAG_API}
    mem_limit: ${MEM_LIMIT}
    ports:
      - "0:80"
    environment:
       - APP_ENV=${APP_ENV}
    command: /usr/local/bin/api-runner
    logging:
      driver: fluentd
      options:
        fluentd-address: ${FLUENTD_ADDRESS}
        tag: apiv2.{{.Name}}.{{.ID}}

Autoscaling

 ECSのAutoscaling、特にlaunch typeをEC2に設定している場合,autoscalingさせるには2つの設定が必要です。それがServiceのscalingとClusterのscalingです. アプリケーションの負荷状況に応じて配備しているtask数を動的に変更していくのがServiceのautoscaling, Serviceが利用できるCPU/メモリはClusterとして登録されているEC2 InstanceのCPU/メモリの合計なので、これを超えてtaskを配置するためのClusterのautoscalingがそれぞれ必要になります。

Service
resource "aws_iam_role" "ecs_autoscale_role" {
  name = "${var.app_name}-ecs-autoscale-role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "application-autoscaling.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "ecs_autoscale_role_attach" {
  role       = "${aws_iam_role.ecs_autoscale_role.name}"
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceAutoscaleRole"
}

resource "aws_appautoscaling_target" "main" {
  max_capacity       = 20
  min_capacity       = 2
  resource_id        = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
  role_arn           = "${aws_iam_role.ecs_autoscale_role.arn}"
  scalable_dimension = "ecs:service:DesiredCount"
  service_namespace  = "ecs"
}

resource "aws_appautoscaling_policy" "scale_out" {
  name               = "${aws_ecs_cluster.api.name}-scale-out"
  resource_id        = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
  scalable_dimension = "ecs:service:DesiredCount"
  service_namespace  = "ecs"

  step_scaling_policy_configuration {
    adjustment_type         = "ChangeInCapacity"
    cooldown                = 60
    metric_aggregation_type = "Average"

    step_adjustment {
      metric_interval_lower_bound = 0
      scaling_adjustment          = 1
    }
  }

  depends_on = ["aws_appautoscaling_target.main"]
}

resource "aws_appautoscaling_policy" "scale_in" {
  name               = "${aws_ecs_cluster.api.name}-scale-in"
  resource_id        = "service/${aws_ecs_cluster.api.name}/${var.app_name}-${var.env_name}"
  scalable_dimension = "ecs:service:DesiredCount"
  service_namespace  = "ecs"

  step_scaling_policy_configuration {
    adjustment_type         = "ChangeInCapacity"
    cooldown                = 60
    metric_aggregation_type = "Average"

    step_adjustment {
      metric_interval_upper_bound = 0
      scaling_adjustment          = -1
    }
  }

  depends_on = ["aws_appautoscaling_target.main"]
}

resource "aws_cloudwatch_metric_alarm" "ecs_service_cpu_utilization_high" {
  alarm_name          = "ecs-service-${var.app_name}-cpu-high-alarm"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = 1
  metric_name         = "CPUUtilization"
  namespace           = "AWS/ECS"
  period              = "180"
  statistic           = "Average"
  threshold           = "70"
  actions_enabled     = true
  treat_missing_data  = "missing"
  alarm_description   = "ecs service"

  dimensions {
    ClusterName = "${aws_ecs_cluster.api.name}"
    ServiceName = "${var.app_name}-${var.env_name}"
  }

  alarm_actions             = ["${aws_appautoscaling_policy.scale_out.arn}"]
  insufficient_data_actions = []
  ok_actions                = []

  depends_on = ["aws_appautoscaling_policy.scale_out"]
}

resource "aws_cloudwatch_metric_alarm" "ecs_service_cpu_utilization_low" {
  alarm_name          = "ecs-service-${var.app_name}-cpu-low-alarm"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = 1
  metric_name         = "CPUUtilization"
  namespace           = "AWS/ECS"
  period              = "3600"
  statistic           = "Average"
  threshold           = "5"
  actions_enabled     = true
  treat_missing_data  = "missing"
  alarm_description   = "ecs service"

  dimensions {
    ClusterName = "${aws_ecs_cluster.api.name}"
    ServiceName = "${var.app_name}-${var.env_name}"
  }

  alarm_actions             = ["${aws_appautoscaling_policy.scale_in.arn}"]
  insufficient_data_actions = []
  ok_actions                = []

  depends_on = ["aws_appautoscaling_policy.scale_in"]
}

 このように、appautoscaling policyを作成、cloudwatch alarmを作成して、metricの閾値に応じてactionを実行する形でautoscalingさせるよう設定しました。ただしこの方法ですと、scale downさせるためにもcloudwatch alarmを設定しなくてはならず、問題がないのもかかわらずalarmが発火してしまうので、他のalarm運用と衝突してしまいかねないので、なにかいい方法がないか探しているところでもあります。

Cluster
resource "aws_autoscaling_policy" "scale_out" {
  name                   = "autoscaling-${aws_ecs_cluster.api.name}-scale-out"
  policy_type            = "SimpleScaling"
  adjustment_type        = "ChangeInCapacity"
  scaling_adjustment     = 1
  cooldown               = 300
  autoscaling_group_name = "${aws_autoscaling_group.ecs.name}"
}

resource "aws_autoscaling_policy" "scale_in" {
  name                   = "autoscaling-${aws_ecs_cluster.api.name}-scale-in"
  policy_type            = "SimpleScaling"
  adjustment_type        = "ChangeInCapacity"
  scaling_adjustment     = -1
  cooldown               = 300
  autoscaling_group_name = "${aws_autoscaling_group.ecs.name}"
}

resource "aws_cloudwatch_metric_alarm" "ecs_cluster_memory_reservation_high" {
  alarm_name          = "ecs-cluster-${aws_ecs_cluster.api.name}-memory-reservation-high-alarm"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = 1
  metric_name         = "MemoryReservation"
  namespace           = "AWS/ECS"
  period              = "60"
  statistic           = "Average"
  threshold           = "70"
  actions_enabled     = true
  treat_missing_data  = "missing"
  alarm_description   = "ecs cluster memory reservation high"

  dimensions {
    ClusterName = "${aws_ecs_cluster.api.name}"
  }

  alarm_actions             = ["${aws_autoscaling_policy.scale_out.arn}"]
  insufficient_data_actions = []
  ok_actions                = []
}

resource "aws_cloudwatch_metric_alarm" "ecs_cluster_memory_reservation_low" {
  alarm_name          = "ecs-cluster-${aws_ecs_cluster.api.name}-memory-reservation-low-alarm"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = 1
  metric_name         = "MemoryReservation"
  namespace           = "AWS/ECS"
  period              = "3600"
  statistic           = "Average"
  threshold           = "30"
  actions_enabled     = true
  treat_missing_data  = "missing"
  alarm_description   = "ecs cluster memory reservation low"

  dimensions {
    ClusterName = "${aws_ecs_cluster.api.name}"
  }

  alarm_actions             = ["${aws_autoscaling_policy.scale_in.arn}"]
  insufficient_data_actions = []
  ok_actions                = []
}

 Cluster側も同様にautoscalingのscaling policyを作成し、監視するMetricに応じたcloudwatch alarmを作成して、scalingさせていきます。 こちらは通常のEC2 InstanceのAutoscaleと変わらないかと思います。ただし、ここでCluster側が一切ECSを意識しなくてよいことによる問題が生じます。それはclusterに属するEC2 Instanceがscale inする際に当該Instaceで起動しているContainerに関知しないことです。そこで、scale in時にhookを設定し、gracefulな終了処理を行います。

Terminate Hook Lambda

f:id:yamaguchi7073xtt:20180707161543p:plain

処理の概要としては、Autoscaleのscalein時のHookにSNS通知を設定し、Lambdaを起動します。Lambdaの中で通知情報から対象EC2 Instanceを取得し、当該EC2 Instance(ECSからはContainer Instace)の状態をECS APIを利用してDrainingに変更します。その後running状態のtask数を取得し、まだrunning状態のtaskがある場合には再度snsに自身を起動した通知を再度pushし、running状態のtaskが0の場合は、autoscale APIのcompleteLifeCycleAction APIを呼び出します。 LambdaはGoで実装しました。

terminating hook lambda
resource "aws_iam_role" "asg_lifecycle_hook_role" {
  name = "asg-lifecycle-hook-${aws_ecs_cluster.api.name}"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": "autoscaling.amazonaws.com"
      }
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "asg_lifecycle_hook_role_attach" {
  role       = "${aws_iam_role.asg_lifecycle_hook_role.name}"
  policy_arn = "arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"
}

resource "aws_autoscaling_lifecycle_hook" "terminating" {
  name                    = "${aws_ecs_cluster.api.name}-terminating-hook"
  lifecycle_transition    = "autoscaling:EC2_INSTANCE_TERMINATING"
  autoscaling_group_name  = "${aws_autoscaling_group.ecs.name}"
  default_result          = "ABANDON"
  heartbeat_timeout       = 900
  notification_target_arn = "${aws_sns_topic.asg_lifecycle_hook_terminating.arn}"
  role_arn                = "${aws_iam_role.asg_lifecycle_hook_role.arn}"
}

resource "aws_iam_role" "terminate_hook_lambda_role" {
  name = "${var.app_name}-asg-terminate-hook-lambda-role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_policy" "terminate_hook_lambda_policy" {
  name        = "${var.app_name}-asg-terminate-hook-policy"
  path        = "/"
  description = "asg,cw,ec2,sns operations"

  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "autoscaling:CompleteLifecycleAction",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "ec2:DescribeInstances",
        "ec2:DescribeInstanceAttribute",
        "ec2:DescribeInstanceStatus",
        "ec2:DescribeHosts",
        "ecs:ListContainerInstances",
        "ecs:SubmitContainerStateChange",
        "ecs:SubmitTaskStateChange",
        "ecs:DescribeContainerInstances",
        "ecs:UpdateContainerInstancesState",
        "ecs:ListTasks",
        "ecs:DescribeTasks",
        "sns:Publish",
        "sns:ListSubscriptions"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "terminate_hook_lambda_basic_execution" {
  role       = "${aws_iam_role.terminate_hook_lambda_role.name}"
  policy_arn = "${aws_iam_policy.terminate_hook_lambda_policy.arn}"
}

resource "aws_lambda_function" "terminating_hook" {
  s3_bucket         = "${var.lambda_s3_bucket}"
  s3_key            = "${lookup(var.lambda_s3_key, "asg-terminate-hook")}"
  s3_object_version = "${lookup(var.lambda_s3_object_version, "asg-terminate-hook")}"
  description       = "make sure no task running when terminating cluster instance."
  function_name     = "${var.app_name}-asg-terminate-hook"
  role              = "${aws_iam_role.terminate_hook_lambda_role.arn}"
  handler           = "handler"
  runtime           = "go1.x"
  memory_size       = 128
  timeout           = 10

  environment {
    variables = {
      LAMBDA_LOG_ENCODE               = "console"
      LAMBDA_AWS_REGION               = "ap-northeast-1"
      LAMBDA_LOG_LEVEL                = -1
      LAMBDA_EC2_TAG_KEY_CLUSTER_NAME = "ClusterName"
    }
  }
}

resource "aws_lambda_permission" "sns_invoke" {
  statement_id  = "${var.app_name}-allow-execute-from-sns"
  action        = "lambda:InvokeFunction"
  function_name = "${aws_lambda_function.terminating_hook.function_name}"
  principal     = "sns.amazonaws.com"
  source_arn    = "${aws_sns_topic.asg_lifecycle_hook_terminating.arn}"
}
lambda

前述したとおり、AutoscalingはClusterの情報をもたないので、terminateされようとしているEC2 Instanceから所属しているClusterを取得する必要があります。約束としてTagにClusterNameを設定する運用をおこなっていますが、lambdaの中では、tagとuser dataのparseという2つのアプローチでClusterNameを取得しようとしています。

package main

import (
 "bufio"
 "bytes"
 "context"
 "encoding/base64"
 "encoding/json"
 "fmt"
 "os"
 "regexp"
 "strconv"
 "strings"
 "time"

 "github.com/aws/aws-lambda-go/events"
 "github.com/aws/aws-lambda-go/lambda"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/autoscaling"
 "github.com/aws/aws-sdk-go/service/ec2"
 "github.com/aws/aws-sdk-go/service/ecs"
 "github.com/aws/aws-sdk-go/service/sns"
 "github.com/davecgh/go-spew/spew"
 "github.com/juju/errors"
 "go.uber.org/zap"
)

const (
 // ECS Container Instanceの状態
 ConInsStatusActive   = "ACTIVE"
 ConInsStatusInactive = "INACTIVE"
 ConInsStatusDraining = "DRAINING"

 // ECS Taskの状態
 TaskStatusRunning = "RUNNING"
 TaskStatusStopped = "STOPPED"
 TaskStatusPending = "PENDING"
)

type App struct {
 Service *Service
 Options AppOptions
 Log     *zap.Logger
}

type AppOptions struct {
 LogLevel                     int
 LogEncode                    string
 Region                       string
 EC2InstanceTagKeyClusterName string
}

// 設定を環境変数から取得
func newAppOptionsFromEnv() AppOptions {
 logLevel, err := strconv.ParseInt(os.Getenv("LAMBDA_LOG_LEVEL"), 10, 0)
 if err != nil {
     logLevel = 0
 }

 logEncode := os.Getenv("LAMBDA_LOG_ENCODE")
 if logEncode == "" {
     logEncode = "console"
 }

 region := os.Getenv("LAMBDA_AWS_REGION")
 if region == "" {
     region = "ap-northeast-1"
 }

 clusterTag := os.Getenv("LAMBDA_EC2_TAG_KEY_CLUSTER_NAME")
 if clusterTag == "" {
     clusterTag = "ClusterName"
 }
 return AppOptions{
     LogLevel:  int(logLevel),
     LogEncode: logEncode,
     Region:    region,
     EC2InstanceTagKeyClusterName: clusterTag,
 }
}

func NewApp() *App {
 app, err := newApp(newAppOptionsFromEnv())
 if err != nil {
     fmt.Println("failed to init app", err.Error())
     os.Exit(1)
 }
 return app
}

func newApp(o AppOptions) (*App, error) {
 logger, err := GetLogger(
     WithLoggingLevel(o.LogLevel),
     WithEncoded(o.LogEncode),
 )
 if err != nil {
     return nil, err
 }

 service, err := NewService(o.Region)
 if err != nil {
     return nil, err
 }

 return &App{
     Service: service,
     Log:     logger,
     Options: o,
 }, nil
}

type Service struct {
 EC2 *ec2.EC2
 ECS *ecs.ECS
 ASG *autoscaling.AutoScaling
 SNS *sns.SNS
}

func NewService(region string) (*Service, error) {
 sess, err := session.NewSession(&aws.Config{
     Region: aws.String(region),
 })
 if err != nil {
     return nil, err
 }

 return &Service{
     EC2: ec2.New(sess),
     ECS: ecs.New(sess),
     ASG: autoscaling.New(sess),
     SNS: sns.New(sess),
 }, nil
}

// 実行時間が500ms程度なので即sns publishすると1secに複数回実行されてしまう
// contextから残り実行時間を取得し可能な限りwaitする
func (a *App) Wait(ctx context.Context) {
 deadline, ok := ctx.Deadline()
 a.Log.Debug("wait", zap.Time("deadline", deadline))
 if ok {
     wakeup := deadline.Add(time.Second * -2)
     if time.Now().Before(wakeup) {
         a.Log.Debug("wait", zap.Time("before_wait", time.Now()))
         select {
         case <-time.After(time.Until(wakeup)):
         }
         a.Log.Debug("wait", zap.Time("after_wait", time.Now()))
     }
 }
}

// このLambdaをもう一度実行するために、同じMessageをSNSにPublishする
func (a *App) PublishSNS(ctx context.Context, topicARN, message string) error {
 const subject = "publish sns to invoke me again"
 input := &sns.PublishInput{
     Message:  aws.String(message),
     Subject:  aws.String(subject),
     TopicArn: aws.String(topicARN),
 }

 a.Wait(ctx)
 output, err := a.Service.SNS.Publish(input)
 if err != nil {
     return errors.Trace(err)
 }
 a.Log.Info("publish_sns",
     zap.String("msg", "successfully publish sns"),
     zap.String("message_id", aws.StringValue(output.MessageId)))
 return nil
}

// Autoscaling GroupにLifecycle Hookの完了を通知する
func (a *App) CompleteASGLifecycleAction(e LifecycleEvent) error {
 input := &autoscaling.CompleteLifecycleActionInput{
     AutoScalingGroupName:  aws.String(e.AutoScalingGroupName),
     InstanceId:            aws.String(e.EC2InstanceID),
     LifecycleActionResult: aws.String("CONTINUE"),
     LifecycleActionToken:  aws.String(e.LifecycleActionToken),
     LifecycleHookName:     aws.String(e.LifecycleHookName),
 }
 _, err := a.Service.ASG.CompleteLifecycleAction(input)
 if err != nil {
     return errors.Trace(err)
 }
 a.Log.Info("comple_asg_lifecycle_hook",
     zap.String("msg", "successfully complete lifecycle hook"))
 return nil
}

// 指定されたcontainer instanceに配置されている指定された状態のTaskを返す
func (a *App) getTasks(clusterName, containerInstanceARN, status string) ([]*ecs.Task, error) {
 if clusterName == "" || containerInstanceARN == "" {
     return nil, errors.Errorf("clusterName %s and/or containerInstanceARN %s is empty", clusterName, containerInstanceARN)
 }
 inputList := &ecs.ListTasksInput{
     Cluster:           aws.String(clusterName),
     ContainerInstance: aws.String(containerInstanceARN),
     DesiredStatus:     aws.String(status),
 }

 var taskARNs []*string
 for {
     outputList, err := a.Service.ECS.ListTasks(inputList)
     if err != nil {
         return nil, errors.Annotatef(err, "clusterName %q, containerInstanceARN %q", clusterName, containerInstanceARN)
     }
     taskARNs = append(taskARNs, outputList.TaskArns...)
     if outputList.NextToken == nil {
         break
     }
     inputList = inputList.SetNextToken(*outputList.NextToken)
 }
 if len(taskARNs) == 0 {
     return nil, errors.NotFoundf("clusterName %q, containerInstanceARN %q", clusterName, containerInstanceARN)
 }

 // 一度に問い合わせできるTaskの上限は100
 var tasks []*ecs.Task
 var n, m int
 for {
     n = m
     m = m + 100
     if m > len(taskARNs) {
         m = len(taskARNs)
     }
     inputDesc := &ecs.DescribeTasksInput{
         Cluster: aws.String(clusterName),
         Tasks:   taskARNs[n:m],
     }
     outputDesc, err := a.Service.ECS.DescribeTasks(inputDesc)
     if err != nil {
         return nil, errors.Annotatef(err, "min %d, max %d", n, m)
     }
     tasks = append(tasks, outputDesc.Tasks...)
     if m >= len(taskARNs) {
         break
     }
 }
 return tasks, nil
}

func (a *App) GetRunningTasks(clusterName, ec2InstanceID string) ([]*ecs.Task, error) {
 if clusterName == "" || ec2InstanceID == "" {
     return nil, errors.Errorf("cluster_name %s and/or ec2_instance_id %s is empty", clusterName, ec2InstanceID)
 }
 containerInstance, err := a.getContainerInstance(clusterName, ec2InstanceID)
 if err != nil {
     return nil, errors.Trace(err)
 }

 containerInstanceARN := aws.StringValue(containerInstance.ContainerInstanceArn)
 tasks, err := a.getTasks(clusterName, containerInstanceARN, TaskStatusRunning)
 if err != nil {
     if errors.IsNotFound(errors.Cause(err)) {
         return nil, nil
     }
     return nil, errors.Trace(err)
 }
 return tasks, nil
}

// ECS Container Instanceと EC2 Instanceは区別されている
func (a *App) getContainerInstance(clusterName, ec2InstanceID string) (*ecs.ContainerInstance, error) {
 inputList := &ecs.ListContainerInstancesInput{
     Cluster:    aws.String(clusterName),
     MaxResults: aws.Int64(100),
 }

 var containerARNs []*string
 for {
     outputList, err := a.Service.ECS.ListContainerInstances(inputList)
     if err != nil {
         return nil, errors.Annotatef(err, "clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
     }
     containerARNs = append(containerARNs, outputList.ContainerInstanceArns...)
     if outputList.NextToken == nil {
         break
     }
     inputList = inputList.SetNextToken(*outputList.NextToken)
 }

 log := a.Log.With(zap.String("cluster_name", clusterName), zap.String("ec2_instance_id", ec2InstanceID))
 log.Debug("list_container_instance", zap.Strings("container_instance_arns", aws.StringValueSlice(containerARNs)))
 if len(containerARNs) == 0 {
     return nil, errors.NotFoundf("clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
 }

 inputDisc := &ecs.DescribeContainerInstancesInput{
     Cluster:            aws.String(clusterName),
     ContainerInstances: containerARNs,
 }
 outputDisc, err := a.Service.ECS.DescribeContainerInstances(inputDisc)
 if err != nil {
     return nil, errors.Annotatef(err, "clusterName %q, ec2InstanceID %q", clusterName, ec2InstanceID)
 }

 target := func(outputDisc *ecs.DescribeContainerInstancesOutput) *ecs.ContainerInstance {
     for _, conIns := range outputDisc.ContainerInstances {
         if strings.Compare(ec2InstanceID, aws.StringValue(conIns.Ec2InstanceId)) == 0 {
             return conIns
         }
     }
     return nil
 }(outputDisc)
 if target == nil {
     return nil, errors.Errorf("failed to fetch container instances from cluster %q, ec2_instance_id %q", clusterName, ec2InstanceID)
 }
 return target, nil
}

// ECS Taskの配置を停止し、既存のTaskをStopするためにContainer Instanceの状態を変更する
func (a *App) ChangeInstanceStatus(clusterName, containerInstanceARN, status string) error {
 if clusterName == "" || containerInstanceARN == "" {
     return errors.Errorf("clusterName %s and/or containerInstanceARN %s is empty", clusterName, containerInstanceARN)
 }
 input := &ecs.UpdateContainerInstancesStateInput{
     Cluster:            aws.String(clusterName),
     ContainerInstances: aws.StringSlice([]string{containerInstanceARN}),
     Status:             aws.String(status),
 }
 output, err := a.Service.ECS.UpdateContainerInstancesState(input)
 if err != nil {
     return errors.Annotatef(err, "clusterName %q, containerInstanceARN %q, status %s", clusterName, containerInstanceARN, status)
 }
 failures := output.Failures
 if len(failures) > 0 {
     buf := new(bytes.Buffer)
     for _, f := range output.Failures {
         buf.WriteString(fmt.Sprintf("%s %s\n", aws.StringValue(f.Arn), aws.StringValue(f.Reason)))
     }
     return errors.Errorf("clusterName %q, containerInstanceARN %q, status %s\nreported failures %s",
         clusterName, containerInstanceARN, status, strings.TrimRight(buf.String(), "\n"))
 }
 return nil
}

// ECS Container Instanceの状態をDraining状態にする
// 既にDrainingであればなにもしない
func (a *App) EnsureInstanceStatusDraining(clusterName, ec2InstanceID string) error {
 if clusterName == "" || ec2InstanceID == "" {
     return errors.Errorf("cluster_name %s and/or ec2_instance_id %s is empty", clusterName, ec2InstanceID)
 }
 containerInstance, err := a.getContainerInstance(clusterName, ec2InstanceID)
 if err != nil {
     return errors.Trace(err)
 }

 containerInstanceARN := aws.StringValue(containerInstance.ContainerInstanceArn)
 status := aws.StringValue(containerInstance.Status)
 log := a.Log.With(
     zap.String("cluster_name", clusterName),
     zap.String("ec2_instance_id", ec2InstanceID),
     zap.String("container_instance_arn", containerInstanceARN),
     zap.String("container_status", status))
 switch status {
 case ConInsStatusDraining:
     log.Info("ensure_container_status", zap.String("msg", "status already draining"))
 case ConInsStatusActive, ConInsStatusInactive:
     log.Info("ensure_container_status", zap.String("msg", "change container status to draining"))
     err := a.ChangeInstanceStatus(clusterName, containerInstanceARN, ConInsStatusDraining)
     if err != nil {
         return errors.Trace(err)
     }
     return a.EnsureInstanceStatusDraining(clusterName, ec2InstanceID)
 default:
     return errors.Errorf("unexpected container instance status %s", status)
 }
 return nil
}

var _clusterNameRegexp = regexp.MustCompile(`^[^#]*ECS_CLUSTER=(?P<name>[${}\w]+) ?.*$`)

func matchClusterName(line string) (clusterName string, ok bool) {
 m := _clusterNameRegexp.FindStringSubmatch(line)
 if len(m) == 2 {
     clusterName, ok = m[1], true
 }
 return
}

func clusterNameFromECSConfig(ecsConfig []byte) (clusterName string, err error) {
 s := bufio.NewScanner(bytes.NewReader(ecsConfig))
 for s.Scan() {
     clusterName, found := matchClusterName(s.Text())
     if found {
         return clusterName, nil
     }
 }
 return "", s.Err()
}

func clusterNameFromUserData(encoded string) (clusterName string, err error) {
 if encoded == "" {
     return "", errors.New("encoded userdata is empty")
 }
 decoded, err := base64.StdEncoding.DecodeString(encoded)
 if err != nil {
     return "", errors.Annotatef(err, "encoded userdata: %s", encoded)
 }
 return clusterNameFromECSConfig(decoded)
}

func (a *App) fetchClusterNameFromUserData(ec2InstanceID string) (clusterName string, err error) {
 // 1. UserData取得
 // 2. base64 decode
 // 3. ClusterName 抽出
 // 4. チェック($ついていない)
 input := &ec2.DescribeInstanceAttributeInput{
     Attribute:  aws.String("userData"),
     InstanceId: aws.String(ec2InstanceID),
 }
 output, err := a.Service.EC2.DescribeInstanceAttribute(input)
 if err != nil {
     return "", errors.Annotatef(err, "ec2 instance id %s", ec2InstanceID)
 }
 if output.UserData == nil {
     return "", errors.Errorf("DescribeInstanceAttributeOuteput.UserData is nil")
 }
 encodedUserData := aws.StringValue(output.UserData.Value)

 clusterName, err = clusterNameFromUserData(encodedUserData)
 if err != nil {
     return "", errors.Trace(err)
 }

 log := a.Log.With(zap.String("ec2_instance_id", ec2InstanceID))
 log.Info("fetch_cluster_name", zap.String("value", clusterName))
 // echo ECS_CLUSTER=$cluster >> /etc/ecs.configのような変数参照をcare
 if strings.Contains(clusterName, "$") {
     log.Info("fetch_cluster_name",
         zap.String("msg", "discard invalid cluster name"),
         zap.String("cluster_name", clusterName))
     clusterName = ""
 }
 return
}

// ec2 instanceに設定されているtagの値を返す
func tagValue(tags []*ec2.Tag, key string) string {
 for _, tag := range tags {
     if strings.Compare(key, aws.StringValue(tag.Key)) == 0 {
         return aws.StringValue(tag.Value)
     }
 }
 return ""
}

func (a *App) fetchClusterNameFromTag(ec2InstanceID string) (clusterName string, err error) {
 input := &ec2.DescribeInstancesInput{
     InstanceIds: aws.StringSlice([]string{ec2InstanceID}),
 }
 output, err := a.Service.EC2.DescribeInstances(input)
 if err != nil {
     return "", errors.Trace(err)
 }

 target := func(output *ec2.DescribeInstancesOutput) *ec2.Instance {
     for _, rsv := range output.Reservations {
         for _, ins := range rsv.Instances {
             if strings.Compare(ec2InstanceID, aws.StringValue(ins.InstanceId)) == 0 {
                 return ins
             }
         }
     }
     return nil
 }(output)
 if target == nil {
     return "", errors.Annotatef(err, "failed to fine ec2 instance %s", ec2InstanceID)
 }

 return tagValue(target.Tags, a.Options.EC2InstanceTagKeyClusterName), nil
}

// EC2 Instance IDからClusterの名前を取得する
// UserData, Tagから取得を試みる
func (a *App) FetchClusterName(ec2InstanceID string) (clusterName string, err error) {
 ec2InstanceID = strings.TrimSpace(ec2InstanceID)
 if ec2InstanceID == "" {
     return "", errors.New("FetchClusterName: ec2InstanceID is empty")
 }

 clusterName, err = a.fetchClusterNameFromUserData(ec2InstanceID)
 if err != nil {
     return "", errors.Trace(err)
 }
 if clusterName != "" {
     return clusterName, nil
 }

 clusterName, err = a.fetchClusterNameFromTag(ec2InstanceID)

 return clusterName, errors.Trace(err)
}

// SNS Messageに格納されているLifecycle Hookの情報
// Document https://docs.aws.amazon.com/ja_jp/autoscaling/ec2/userguide/lifecycle-hooks.html#sns-notifications
// 値のsample
// (main.LifecycleEvent) {
// LifecycleHookName: (string) (len=37) "xxx-cluster-terminationg-hook",
// AccountId: (string) (len=12) "XXXXXXXXXXXXXXXXXXXXX",
// RequestId: (string) (len=36) "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
// LifecycleTransition: (string) (len=36) "autoscaling:EC2_INSTANCE_TERMINATING",
// AutoScalingGroupName: (string) (len=17) "xxx-xxx-xxx-xxx",
// Service: (string) (len=16) "AWS Auto Scaling",
// TimeRaw: (string) (len=24) "2080-03-03T11:22:33.855Z",
// EC2InstanceId: (string) (len=19) "i-YYYYYYYYYYYYYY",
// NotificationMetadataRaw: (string) (len=41) "{\n \"cluster\": \"\"\"xxx-xxx-xxx-xxx\"\n}\n",
type LifecycleEvent struct {
 LifecycleHookName       string `json:"LifecycleHookName"`
 AccountID               string `json:"AccountId"`
 RequestID               string `json:"RequestId"`
 LifecycleTransition     string `json:"LifecycleTransition"`
 AutoScalingGroupName    string `json:"AutoScalingGroupName"`
 Service                 string `json:"Service"`
 TimeRaw                 string `json:"Time"`
 EC2InstanceID           string `json:"EC2InstanceId"`
 NotificationMetadataRaw string `json:"NotificationMetadata"`
 LifecycleActionToken    string `json:"LifecycleActionToken"`

 // 任意のjsonをSNS Topic作成時に指定できる.
 // 現時点では利用していない
 NotificationMetadata map[string]interface{}
}

func NewLifecycleEventFromJSON(raw string) (LifecycleEvent, error) {
 var event LifecycleEvent
 if raw == "" {
     return event, errors.New("NewLifecycleEventFromJSON: empty json")
 }

 err := json.Unmarshal([]byte(raw), &event)
 if err != nil {
     return event, errors.Errorf("NewLifecycleEventFromJSON: json unmarshal error: %s", err)
 }

 meta := event.NotificationMetadataRaw
 if meta != "" {
     event.NotificationMetadata = make(map[string]interface{})
     if err := json.Unmarshal([]byte(meta), &event.NotificationMetadata); err != nil {
         // 現時点では値を利用していないのでerrorを無視する
     }
 }
 return event, nil
}

// SNS通知のValidation
type EventValidator func(LifecycleEvent) error

func serviceIsAutoScaling(e LifecycleEvent) error {
 const want = "AWS Auto Scaling"
 if e.Service != want {
     return errors.Errorf("LifecycleEvent.Service should be %q, but %q", want, e.Service)
 }
 return nil
}

func transitionIsTerminating(e LifecycleEvent) error {
 const want = "autoscaling:EC2_INSTANCE_TERMINATING"
 if e.LifecycleTransition != want {
     return errors.Errorf("LifecycleEvent.LifecycleTransition should be %q, but %q", want, e.LifecycleTransition)
 }
 return nil
}

func ValidateEvent(event LifecycleEvent, fns ...EventValidator) error {
 for _, f := range fns {
     if err := f(event); err != nil {
         return errors.Trace(err)
     }
 }
 return nil
}

// 1. SNSからTerminate対象EC2 Instanceを取得
// 2. EC2 InstanceからCluster Nameを取得(userdata, tag)
// 3. EC2 Instanceの状態をDrainingに変更
// 4. EC2 Instanceのrunning task数を確認
//   1. task数 == 0 => OK
//   2. task数 >= 1 => SNSに再度通知して終了
// 5. AutoscalingにLifecycle Action完了を通知
func (a *App) HandleRecord(ctx context.Context, record events.SNSEventRecord) error {
 lcEvent, err := NewLifecycleEventFromJSON(record.SNS.Message)
 if err != nil {
     return errors.Trace(err)
 }
 a.Log.Debug("parse_lifecycle_event", zap.String("dump", spew.Sdump(lcEvent)))

 err = ValidateEvent(lcEvent,
     serviceIsAutoScaling,
     transitionIsTerminating)
 if err != nil {
     return errors.Annotatef(err, "failed to validate lifecycle event")
 }

 clusterName, err := a.FetchClusterName(lcEvent.EC2InstanceID)
 if err != nil {
     return errors.Annotatef(err, "failed to fetch cluster name from ec2 instance %s", lcEvent.EC2InstanceID)
 }
 if clusterName == "" {
     return errors.Errorf("failed to fetch cluster name from ec2 instance %s", lcEvent.EC2InstanceID)
 }
 log := a.Log.With(
     zap.String("cluster_name", clusterName),
     zap.String("ec2_instance_id", lcEvent.EC2InstanceID),
 )
 log.Info("fetch_cluster_name")

 err = a.EnsureInstanceStatusDraining(clusterName, lcEvent.EC2InstanceID)
 if err != nil {
     errors.Annotatef(err, "ec2 instance: %s", lcEvent.EC2InstanceID)
 }

 runningTasks, err := a.GetRunningTasks(clusterName, lcEvent.EC2InstanceID)
 if err != nil {
     return errors.Trace(err)
 }
 log.Info("fetch_running_tasks", zap.Int("running_tasks_num", len(runningTasks)))

 if len(runningTasks) == 0 {
     log.Info("complete_asg_lifecycle_action",
         zap.String("lifecycle_hook_name", lcEvent.LifecycleHookName),
         zap.String("autoscaling_group_name", lcEvent.AutoScalingGroupName))
     return a.CompleteASGLifecycleAction(lcEvent)
 }
 log.Info("publish_sns",
     zap.String("topic_arn", record.SNS.TopicArn),
     zap.String("message", record.SNS.Message))
 return a.PublishSNS(ctx, record.SNS.TopicArn, record.SNS.Message)
}

func (a *App) HandleEvent(ctx context.Context, event events.SNSEvent) error {
 a.Log.Info("start")

 for _, record := range event.Records {
     select {
     case <-ctx.Done():
         return ctx.Err()
     default:
         if err := a.HandleRecord(ctx, record); err != nil {
             a.Log.Error("failed", zap.String("details", errors.Details(err)))
             return err
         }
     }
 }

 a.Log.Info("success!")
 return nil
}

func main() {
 app := NewApp()
 lambda.Start(app.HandleEvent)
}

Goのlambdaはbuildしたbinaryをzip化してS3に設置するだけなのでシンプルです。terraform側ではbucket/keyとversionIDを指定しています。

GOOS=linux GOARCH=amd64 go build -o _build/handler .
(cd _build && zip handler.zip handler)
aws s3 mv _build/handler.zip s3://backet/prefix/asg-terminate-hook/handler.zip
aws s3api head-object --bucket bucket --key prefix/asg-terminate-hook/handler.zip --query 'VersionId' --output=text

コンテナは動的に増減していくので、API作成時は後述する12factor app 9の 廃棄容易性を意識してすばやくshutdownできる必要があります。

GoでのAPI Server実装

利用したpackage

主に以下のpackageを利用しました

gin

http関連のhandlingはginを利用しました、requestをparseしてjsonを返すsimpleなAPIなので特に問題なく利用できまし。URLのroutingにはhttprouter(https://github.com/julienschmidt/httprouter)が利用されているので,URLが特殊な場合は事前に対応できるか確認が必要です。validate処理にはvalidator(https://github.com/go-playground/validator)を利用してvalidate処理を行いました

gorm

DB処理にはgorm(https://github.com/jinzhu/gorm)を利用しました。ORMというよりはSQL Builder的に利用しました。document(http://gorm.io/docs/)が充実しており、とても利用しやすかったです。 subqueryが必要な場合は明示的にsubqueryを生成する必要があるので、裏側でとんでもないSQLが生成されるといったこともなく、過度な抽象化は行わない方針で進めました。

zap

個人的にはGoのloggingについて、標準 packageの log 以外に決定版が存在していないと思っており,皆様がloggingをどのように行っているか気になっているところです。 今回は構造化されたloggingを行いたかったのでuber社が公開しているzap(https://github.com/uber-go/zap)を利用しました。

package layout

 今回の一番の悩みどころがpackageのlayout(directory構成)でした。 Goの場合Webframeworkといった形で構成から命名規則まで統一的に管理するより各component単位でpackageを組み合わせ、できるだけ小さく保っていくアプローチがマッチしていると感じています。そこで、Clean Architectureの考えにのっとり,domain層(業務logic),httpに関連するcontroller層,データのCRUDを担当するrepository層の分離を意識したpackage構成にしました。自分自身、clean architectureの概念を勉強中で Clean Architecture: A Craftmans\'s Guide to Software Structure and Design(https://www.amazon.co.jp/dp/B075LRM681)や各種blog等を読ませていただき悪戦苦闘している状態です。  個人的には以下の点を満たせていればよいのではと思っております

  • 業務logicを表現するlayerは他のlayerに依存せず,必要なサービスや機能はinterfaceで表現
  • interfaceを実装するうえで共通のリソース(DB,検索engin,cache...)に依存する処理をpackageに落とし込む
  • http関連の概念は担当layerでまとめて、layer間はできるだけprimitiveなデータとinterfaceだけの依存関係にする

12 factor app

Dockerを用いて開発したこともあり、12 factor app(https://12factor.net/)に適合するよう意識しました. 12 factor appとGoについては12 Factor Applications with Docker and Go(https://www.amazon.co.jp/12-Factor-Applications-Docker-English-ebook/dp/B075HWVLMC)が参考になりました。

コードベース

githubを利用しているので、ここは特に意識しませんでした。

依存関係

Goのpackage管理にはdepを利用しました。

設定

設定は設定fileを用意することはせず、すべて環境変数から渡すようにしました。そのため設定値を管理するためにviper(https://github.com/spf13/viper)を利用したりはせず、直接環境変数を読んで構造体にbindするシンプルなものにしました。認証情報はAWS SecretsManager上に保持するようにしました。

バックエンドサービス

前述しました、clean architectureにそっていれば自然と外部サービスへの依存はinterfaceとして切り出されているはずなので、特に意識することなく環境変数を通じた切り替えができました。

ビルド, リリース, 実行

この箇所は自分の理解が曖昧なのですが、リリースを一意に識別し、ロールバックする点については、ECRを通じたimage管理で実現できていると思っています。

プロセス

単純なAPI Serverだったので特に意識せずにstatelessかつshared-nothingが実現できたのではないでしょうか。

ポートバインディング

Goを利用していれば net/http packageのおかげて自然とport bindingまで完結されるで特に意識しませんでした。

並行性

理解が曖昧ですが、share-nothingが満足されていれば、自然と満たせると考えております。 この考えを推し進めるとあまり多くのgoroutineを生成すべきでないという結論にいきつくのでしょうか。

廃棄容易性

廃棄容易性と仰々しいですが、signalをhandlingしてhttp serverをshutdownする処理を加えました。 ginのsampleを参考にしました(https://github.com/gin-gonic/gin/blob/master/examples/graceful-shutdown/graceful-shutdown/server.go) autoscalingの中でappの内容をできるだけ意識したくないので、起動/終了処理が数秒で完了することは運用面からとても重要な性質だと実感しています。 非同期で処理するジョブを抱え込んだりして、すぐに終了できなくなった場合は黄色信号で、Queue等を通じて処理を分散させる必要があります。

ログ

12factor appを読む前はログといえばログファイルに書き込んでローテションさせるものと思っており、app側が出力ストリームより向こう側を意識してはいけないという主張は極端だなと感じていました。ところが実際にapp側はSTDOUTに書き込むだけにすることで、logging関連処理をappから切り離し、ローテション処理やストレージ管理といった依存性もなくすことができました。 ログ管理については後述します。

管理プロセス

この考えは徹底できておらず、今後の改善が望まれる箇所です。 定期実行が必要な処理はlambda等に切り出し、DBや外部サービスの管理は別途オーケストレーションツール(chef,ansible)で行っています。

Log

logging処理の流れは, App -> fluentd -> bigquery というシンプルな構成をとりました。

app

loggingに関しては12 factor ppの箇所でも述べた通り、app側ではSTDOUTへの書き込み以降は意識しないようになっており、出力先はtask definition(docker-componse.yml)で制御しています。 以下のようにlogging.dirverとしてfluentdを指定しています。logging driverにfluentdがサポートされているので、appの中ではfluentdに書き出していることも意識しないことができました.

version: '2'
services:
  api:
    image: ${IMAGE_TAG_API}
    mem_limit: ${mem_limit}
    ports:
      - "0:80"
    environment:
       - APP_ENV=${APP_ENV}
    command: /usr/local/bin/api-runner
    logging:
      driver: fluentd
      options:
        fluentd-address: ${FLUENTD_ADDRESS}
        tag: apiv2.{{.Name}}.{{.ID}}

fluentd

fluentdについてもAPI Server同様、ECSで管理しています。 logがどのように利用されるかはlogging処理作成時点ではわかっていない/運用次第で変化しうる箇所なので、いったんaggreageする層が必要だと考えています。 fluentd imageについては公式を参考に以下のようなDockerfileになりました

FROM fluent/fluentd:v1.1.1-debian-onbuild

# install plugins
RUN buildDeps="sudo make gcc g++ libc-dev ruby-dev" \
 && apt-get update \
 && apt-get install -y --no-install-recommends $buildDeps \
 && sudo gem install \
        fluent-plugin-bigquery:1.2.0 \
 && sudo gem sources --clear-all \
 && SUDO_FORCE_REMOVE=yes \
    apt-get purge -y --auto-remove \
                  -o APT::AutoRemove::RecommendsImportant=false \
                  $buildDeps \
 && rm -rf /var/lib/apt/lists/* \
           /home/fluent/.gem/ruby/2.3.0/cache/*.gem

# ONBUILDでcurrentのfluent.confは/fluentd/etc/fluent.confにCOPYされる
RUN mkdir -p /fluentd/etc/files
ADD files /fluentd/etc/files/

# -v  -> debug
# -vv -> trace
ENV FLUENTD_OPT ""

CMD exec fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins $FLUENTD_OPT

fluentdがAPI Serverの構成と異なる点は, ALB ListenerのprotocolをTCPにする関係で動的ポートの設定ができず、1Host-1Taskなってしまっている点です。この点は今後の改善点ですが現状ではfluend側の機能で吸収しきれています。また、fluentd側のlogDriverにはawslogsを設定し、そのままforwardすることで、Cloudwatchからlogを確認できるようにしています.

[
    {
        "name": "fluentd",
        "image": "${image_url}",
        "cpu": 0,
        "memory": ${memory},
        "portMappings": [
            {
                "hostPort": 24224,
                "protocol": "tcp",
                "containerPort": 24224
            }
        ],
        "logCOnfiguration": {
          "logDriver": "awslogs",
          "options": {
            "awslogs-region": "${region}",
            "awslogs-group": "{cw_logs_group}",
            "awslogs-stream-prefix": "${cw_stream_prefix}"
          }
        },
        "essential": true
    }
]

bigquery

GCP Serviceのひとつであるbigqueryがlogの最終的な格納先です。 bigquery自体は既存システムが利用していた関係で採用されていますが、logを構造化して出力したい理由のひとつでした。 というのも、bigqueryは事前に定義されたschemaをもつので、loggingもアドホックにせず決められたkeyを出力する必要があったからです。 bigqueryへの入力はfluentdのfilter pluginを用いて、jsonをparseし、bigquery plugin(https://github.com/kaizenplatform/fluent-plugin-bigquery)を利用してinsertしました。logの分析がbigqueryを通じて行えるので, SQLが使えるのがとてもありがたいです。

-- 問題のあるendpointの抽出
SELECT *
FROM <log_table>
WHERE
  status_code != 200
  AND
  path = "/version/endpoint"
  AND
  timestamp BETWEEN <FROM> AND <TO>

-- endpointごとのlatency
SELECT path, AVG(latency) AS latency_avg
FROM <log_table>
GROUP BY path

運用

監視

外形監視

外形監視についてはMackerel(https://mackerel.io/) を利用しております。 Mackerelはsimpleで使いやすくUIも洗練されていて,とてもすばらしいサービスだと思います。(もちろん(CLI)https://mackerel.io/ja/docs/entry/advanced/cli)ただし、リクエスト間隔は1分で固定なので注意が必要です。 EC2 InstanceのMetricはECSを通じて収集しているので、mackerel-agent(https://github.com/mackerelio/mackerel-agent)は利用していないのですが、Go製でServerのMetricとはどうやって収集するのか興味があるので、ソースを読んで勉強させてもらいたいと思っています。

Metric監視

CloudWatch AlarmをECSの各種Metricに設定し,SNS経由でpagerdutyに集約するようにしています。

ecs-agent監視

ecs-agentについてはECS API DescribeContainerInstancesからAgentConnectedを取得し、接続を確認できない場合はSNSに通知するLambdaをCloudWatch Eventで定期実行しています。

fluentd監視

fluentdの監視についてはCloudWatch LogGroupを作成し、log metric filterを作成し、シンプルにlog中のerrorを拾ってMetricに変換しています。bigqueryのinsertに関してはfluentdのlogGroupに対してlog subscriptionを設定し、lambdaを呼び出しています。lambdaの中でfluentdのlogをparseし、CloudWatchのPutMetrics APIを呼び出しています。

resource "aws_cloudwatch_log_subscription_filter" "watch_fluentd" {
  name            = "fluentd_status_logfilter"
  log_group_name  = "${aws_cloudwatch_log_group.log.name}"
  filter_pattern  = "${var.fluentd_subscription_filter_pattern}"
  destination_arn = "${aws_lambda_function.watch_fluentd.arn}"
  distribution    = "ByLogStream"

  depends_on = ["aws_lambda_permission.allow_cw_invoke_lambda"]
}
package main

import (
 "context"
 "encoding/json"
 "fmt"
 "log"
 "os"
 "strconv"
 "strings"

 ac "github.com/howtv/go-awsctr"

 "github.com/aws/aws-lambda-go/events"
 "github.com/aws/aws-lambda-go/lambda"
 "github.com/juju/errors"
 "github.com/koron/go-dproxy"
 "go.uber.org/zap"
)

// App -
type App struct {
 Service *Service
 Options AppOptions
 Log     *zap.Logger
}

// AppOptions -
type AppOptions struct {
 LogLevel  int
 LogEncode string
}

// 設定を環境変数から取得
func newAppOptionsFromEnv() AppOptions {
 logLevel, err := strconv.ParseInt(os.Getenv("LAMBDA_LOG_LEVEL"), 10, 0)
 if err != nil {
     logLevel = 0
 }

 logEncode := os.Getenv("LAMBDA_LOG_ENCODE")
 if logEncode == "" {
     logEncode = "console"
 }

 return AppOptions{
     LogLevel:  int(logLevel),
     LogEncode: logEncode,
 }
}

// NewApp -
func NewApp() *App {
 app, err := newApp(newAppOptionsFromEnv())
 if err != nil {
     fmt.Println("failed to init app", err.Error())
     os.Exit(1)
 }
 return app
}

func newApp(o AppOptions) (*App, error) {
 logger, err := GetLogger(
     WithLoggingLevel(o.LogLevel),
     WithEncoded(o.LogEncode),
 )
 if err != nil {
     return nil, err
 }

 service, err := NewService(o)
 if err != nil {
     return nil, err
 }

 return &App{
     Service: service,
     Log:     logger,
     Options: o,
 }, nil
}

// Service -
type Service struct {
 Aws *awsResource
}

// NewService -
func NewService(o AppOptions) (*Service, error) {
 awsRsc := newAWSResource()
 return &Service{
     Aws: awsRsc,
 }, nil
}

type awsResource struct {
 cw ac.CloudWatch
}

func newAWSResource() *awsResource {
 sess := ac.NewSession("ap-northeast-1")
 return &awsResource{
     cw: ac.NewCloudWatch(sess),
 }
}

// FluentStatus -
type FluentStatus struct {
 BufferQueueLength     string
 BufferTotalQueuedSize string
 RetryCount            string
}

// ParseMessage -
// input sample:
// 2018-05-02 05:26:46.346961637 +0000 fluent.status: { "plugin_id": "bq_nginx", "plugin_category": "output", "type": "bigquery", "output_plugin": true, "buffer_queue_length": 0, "buffer_total_queued_size": 0, "retry_count": 0 }
func ParseMessage(msg string) ([]ac.CountMetricsInfo, error) {
 msgs := strings.SplitN(msg, " ", 5)
 m := strings.Join(msgs[4:], "")

 var fluentStatus interface{}
 err := json.Unmarshal([]byte(m), &fluentStatus)
 if err != nil {
     return nil, err
 }
 f := dproxy.New(fluentStatus)
 pluginIDValue, _ := f.M("plugin_id").String()
 bufferQueueLength, _ := f.M("buffer_queue_length").Float64()
 bufferTotalQueueSize, _ := f.M("buffer_total_queued_size").Float64()
 retryCount, _ := f.M("retry_count").Float64()

 var info []ac.CountMetricsInfo
 info = []ac.CountMetricsInfo{
     ac.CountMetricsInfo{
         NameSpace:      "fluentd",
         DimensionName:  "PluginId",
         DimensionValue: pluginIDValue,
         MetricName:     "buffer_queue_length",
         Value:          bufferQueueLength,
     },
     ac.CountMetricsInfo{
         NameSpace:      "fluentd",
         DimensionName:  "PluginId",
         DimensionValue: pluginIDValue,
         MetricName:     "buffer_total_queued_size",
         Value:          bufferTotalQueueSize,
     },
     ac.CountMetricsInfo{
         NameSpace:      "fluentd",
         DimensionName:  "PluginId",
         DimensionValue: pluginIDValue,
         MetricName:     "retry_count",
         Value:          retryCount,
     },
 }

 return info, nil
}

// HandleLogEvent -
func (a *App) HandleLogEvent(e events.CloudwatchLogsLogEvent) error {
 a.Log.Debug("handle_log_event",
     zap.String("id", e.ID),
     zap.Int64("timestamp", e.Timestamp),
     zap.String("message", e.Message),
 )

 metrics, err := ParseMessage(e.Message)
 if err != nil {
     return errors.Annotatef(err, "failed to parse event message %s", e.Message)
 }

 for _, m := range metrics {
     err = a.Service.Aws.cw.PutCountMetrics(m)
     if err != nil {
         return errors.Annotatef(err, "failed to put metrics %#v", m)
     }
 }

 a.Log.Info("put_metrics", zap.String("msg", "put log message successfully"))
 return nil
}

// parseRawData -
func (a *App) parseRawData(raw events.CloudwatchLogsRawData) (events.CloudwatchLogsData, error) {
 return raw.Parse()
}

// HandleEvent -
func (a *App) HandleEvent(ctx context.Context, event events.CloudwatchLogsEvent) error {
 log.Println("start")

 data, err := a.parseRawData(event.AWSLogs)
 if err != nil {
     return errors.Annotatef(err, "failed to parse event raw data")
 }

 a.Log.Info("handle_event",
     zap.String("log_group", data.LogGroup),
     zap.String("log_stream", data.LogStream),
     zap.Strings("subscription_filters", data.SubscriptionFilters),
     zap.String("message_type", data.MessageType),
     zap.Int("log_events_num", len(data.LogEvents)))

 for _, logEvent := range data.LogEvents {
     select {
     case <-ctx.Done():
         return ctx.Err()
     default:
         if err := a.HandleLogEvent(logEvent); err != nil {
             a.Log.Error("failed", zap.String("details", errors.Details(err)))
             return err
         }
     }
 }

 log.Println("success")
 return nil
}

func main() {
 app := NewApp()
 lambda.Start(app.HandleEvent)
}

まとめ

  • terraformはcloudformationより使いやすかった
  • ECSはlaunch typeをEC2にする限り、ClusterとService2つのautoscalingの設定が必要
  • Clusterをscale inする際は自前でgracefulな処理を作らないといけない
  • 12 factor appに従うようにしておくと、アプリをコンテナ化して運用しやすい
  • Goは楽しい

お知らせ

ハウテレビジョンでは、エンジニアを募集しています。AWS,Go,Dockerで一緒にシステムを作っていきませんか。ご連絡はこちら

React Nativeにおける多タブかつ件数の多いリストをつつがなく表示させるには

TL; DR

  • はじめに
  • 主要なライブラリ/ディレクトリ構成
  • このようなリストのおはなしです
  • 実装上で気をつけたポイント
  • 起こりがちな問題点 NINJA
  • NINJAを防ぐにはどうするか
  • おわりに

はじめに

こんにちは!世界で挑戦したいと思う学生に向けた就活支援プラットフォーム、外資就活ドットコムの開発チームです。 先日iPhoneアプリをReact Nativeでリニューアルの上リリースしました!(パチパチ)

さて、このアプリは募集情報や体験記、コミュニティなど、比較的件数が多いリストを表示させる箇所がいくつかあります。

そしてそういったリストも業界などのカテゴリやタグでのタブ切り替え表示やソート順などでの切り替え表示をさせています。

つまり、

- データの管理
- 大量のコンポーネント(->ここではリストのアイテム1個1個のこと)の表示管理
- 遅延の少ない切り替え

といったことが必要になってきます。

それを実現するまでにどのような構成で作っていったか、また想定外のハードルも乗り越える必要があったのでそこで得た知見をシェアできればと思います。

主要なライブラリ/ディレクトリ構成

本題に入る前に主要なライブラリ構成を書いておきます。(一部抜粋)

redux:状態を一元管理
react-redux:reactとreduxをつなげる
redux-saga:非同期処理
redux-persist:データ永続化
react-native-router-flux:画面遷移 (使い方はreact-routerに似ている)
styled-components:セマンティックにCSS設計ができる
NativeBase:UIコンポーネント補助

またこのようなディレクトリ構成になっています。

src:
  actions:reduxのaction
  api:sagaで呼び出すapi
  components:各画面で呼び出すコンポーネント (部品)
  constants:定数定義(reduxのaction名他)
  containers:画面(reduxとconnect)
  reducers:reduxのreducer
  router:react-native-router-fluxのrouter
  saga:redux-sagaのsaga

*本エントリでは各項目の詳しい内容についてはレクチャーしないので、参考リンクを置いておきます

このようなリストのおはなしです

下記画像のように業界タブごとに表示させることも可能で、 企業名や体験記の種別で絞り込んだり、人気順などでソートすることも出来ます。

f:id:tacker-howte:20180629171626p:plain

実装上で気をつけているポイント

さて、つらつらと書いていくと長くなってしまうので、気をつけたポイントを列挙していこうと思います。

データの管理

リストの内容とフォロー状態などのstateは分ける

リスト取得時にフォロー状態も合わせて取得できるようなAPIにしていますが、それをreduxのstateに保管するときは内容とフォロー状態(KeyValue)、またページング(KeyValue)に分けて保管しています。

そうすることでフォローのコンポーネントがそのリストアイテム以外にある場合でも瞬時に状態を共有でき、変わった場合それぞれのコンポーネントで再レンダリングを走らせることが可能になります。

また下記のように、内容とページングはカテゴリ(タブ)ごと、またソート順ごとに保管するためそこまではObject(連想配列)で保管するのに対し、フォロー状態はアイテムのKeyValueなので単純な配列で保管しています。

reducers/reportListReducer.js

  const initialState = {
    ...
    actionPage: {},
    reportListByIndustryCategory: {},
    reportClips: [],
    ...
  }

ソート順を保管し、各ソート順のページ状態も保管する

先ほどソート順に保管すると書きましたが、 これはそうすることで単純なソート切り替え時に(すでに取得済みの場合)通信が走らずにパッと切り替えられるようにすることを目指しました。

またページングも合わせて保管することでエッジケースの齟齬がないようにしています。

(ここはSagaを利用してバックグラウンドで表示しているタブ以外も逐次取得したいと思っているのですが、フィルタ切り替えを頻繁に行うなどの際に今の所うまくいっていないのでちょっと放置しています)

reducers/reportListReducer.js

  case REPORT_LIST_GET_LIST_SUCCESS: {
      const lists = deepCloneObj(state.reportListByIndustryCategory);
      const actionPage = deepCloneObj(state.actionPage);
      ...
      actionPage[`${action.industryCategoryId}`][`${action.order}`] = action.page;
      ...
      lists[`${action.industryCategoryId}`][`${action.order}`] = lists[`${action.industryCategoryId}`][`${action.order}`].concat(action.reports);

      const reportClips = state.reportClips.slice();
      action.reports.forEach((val) => {
        reportClips[val.id] = { clipped: val.clipped, count: val.clipsCount };
      });

      return {
        ...
        reportListByIndustryCategory: lists,
        ...
        actionPage,
        reportClips,
      };
    }

大量のコンポーネントの表示管理

React Nativeではいかに再レンダリングを走らせないようにし、端末に負担をかけさせないようにするかが重要になります。

そのためコンポーネントは(ユーザーがフォローを行うなど)頻繁に表示が変わりうるものとそうでないものは分けています。 表示が変わらないものは shouldComponentUpdate=false or PureComponent 使用をしています。

また、リストのコンポーネントはVirtualizedList を使用しているのですが、Props removeClippedSubviewstrue にすることでメモリ使用量の増大や突然アプリが落ちる事態を防いでいます。

遅延の少ない切り替え

  • react-native-scrollable-tab-viewprerenderingSiblingsNumber1にする

    こちら、NativeBaseのScrollableTabを使用すると実質的にはこのライブラリを使うことになるのですが、 この prerenderingSiblingsNumber に1を設定しないと タブ切り替え時の挙動がモタっとしてしまいます。

React NativeではちょっとしたPropsの設定で挙動のタイミングが大幅に変わり、 体験も大きく変わってしまうことがよくあるので気をつけましょう。

  • react-native-scrollable-tab-view の onChangeTab で変更したカテゴリ情報を componentWillReceiveProps で検知する

    まだリストを取得していないタブに移動する場合、当初は onChangeTab でリスト取得処理も合わせて行なっていたのですが、そうすると描画までもたついてしまうんですね。

    そこでここでは単純なタブ移動(ここではつまりカテゴリの変更)のみreduxに通知、保管し、それを reactのlifeCycleである componentWillReceiveProps で 状態の変更を検知し、リスト取得処理を走らせるようにしました。 これによりレンダリングはそこまで遅延せずに済むようになっています。

    • ちなみにUpdate on Async Rendering にあるように、 componentWillReceiveProps は React 17以降削除されてしまうという移り変わりの早さが良くも悪くもすごいですね。

※なおデータの管理で書いた

- ソート順を保管し、各ソート順のページ状態も保管する

も遅延の少ない切り替えに貢献しています。

起こりがちな問題点 NINJA

上で書いた実装上の気をつけるポイントをしなくても、社内でNINJAと呼ばれているレンダリングの不正が起こります。

開発スタートした当初はリストで11番目のアイテムを表示させようとしただけでリスト全体がぐしゃっと重なった表示(隠れ身の術)になるなど、常にリスト周りは暗雲が立ち込めていました。。

また、タブもうまくデータ管理しておかないとタブが左右に高速回転しだす(分身の術)など、憎めないヤツです(笑) f:id:tacker-howte:20180629175313g:plain

一番悩まされたのは下部メニューを切り替えるとたまーに真っ白になるケースでした。これはとあるウルトラCを使っているのですが、気になる方はぜひオフィスに遊びに来てください♪

NINJAを防ぐにはどうするか

一番疑った方が良いのは、意図せず複数のAPI取得が走っていないか、またニアリーイコールですが意図せぬ挙動が走っていないかです。

なので action がいつ走っているか常にデバックしないとですね。

そしてそれでもダメな場合はreactやredux、redux-saga公式のdocsとにらめっこしましょう。ほとんどの場合はなんとかなります。(たぶん)

終わりに

最近 React Native at Airbnb: The Technology が話題ですが、 現在の弊社アプリの規模感だと中期的にはReact Nativeをこれからも使用していく予定です。(もっと大規模になるとAirbnbのように別の形をトライするかもしれませんが。)

ただ、React Nativeにおいても既存のiPhone/Android開発の知見は必要な部分も多いので、そういう方で新しい技術にもトライしてみたい方など絶賛募集中です。

またReactとかVueとかAngularとか触ってはいるけどまだアプリに手を出していない方も絶賛募集中です!

ぜひぜひオフィスに遊びに来てくださいー!

ご連絡はこちらよりお待ちしております。

AWS Summit、Google Cloud Nextに参加しました

弊社サービスも日頃からお世話になっている、AWSとGCP。
この両者が大規模なカンファレンスを東京で行うということで、参加してきました。
AWS Summitに3日間、Google Cloud Nextに2日間参加しましたので、ざっくりとしたレポートを掲載します。

なお、個々の発表には触れず、全体的な感想のみ記載します。

f:id:itamisky:20170616171550p:plain

会議の概要

基調講演があり、導入事例紹介や各サービスの紹介が行われるセッションがパラレルで開催される、という点は同じです。

AWS Summit

AWS Summitでは豊富なセッションがあり、実に様々な機能・事例の紹介がありました。
参加者も多く、広い会場が埋まっており勢いを感じました。
また、SummitとDev Dayに大別されて会場が分かれており、専門や興味に応じた参加が可能でした。
なお、Dev Dayの最終日は「Serverless Evolution Day」として全てサーバーレスに関する発表がなされるという思い切った構成でした。

Google Cloud Next

f:id:itamisky:20170616171602p:plain

Google Cloud Nextでも、基本的にはAWS Summitと同じような構成が取られ、発表内容も親しいものでした。
が、規模は比較すると小さく、セッション数も少ないため、興味に合うものが見当たらないことも発生しがちだったかと思います。
GCPが生まれた経緯を踏まえてか、一部セッションでGoogle社内システムの説明があり興味深かったです。
また、KubernetesやOpen API互換などに代表されるように、オープンであることを押し出しており、ロックインを自ら放棄する姿勢が素敵です。

発表の傾向

参加したセッションによりどうしても偏りがでますが、キーワードとして「マイクロサービス」が挙げられるかと思います。

単純なサーバーはパブリッククラウドへの移行が順調に進んでおり、更に各サービスを利用させ、新サービスをクラウド上で構築させる段階に入っているのかな、と感じました。

発表内容の大別

上記を踏まえ、発表内容は以下の2つに大別できるかと思います。

  1. オンプレからパブリッククラウドへの移行促進
  2. 各種サービスの利用促進

1は、引き続きオンプレで運用しているサービスをクラウド上に移行してもらうためのものです。
2は、AI系サービスの新規利用やマイクロサービス化など、今まで各自が実装していた機能を提供することで、利用量を増やすためのものです。
これはロックインにも繋がります。

この区分で言うと、1より2の発表が多く見え、何となく時代の流れを感じました。

その他

AWS Summitの企業ブースは、配っているグッズを貰うと手持ちの二次元コードをスキャンされ、後ほどメールが届くという効率的なシステムでした。

アンケートに答えると公式グッズがもらえます。
何がもらえるかは行ってのお楽しみ。

まとめ

AWS、GCP共に次々と公式サービス・関連サービスが追加され、開発者にとっては便利な世の中になっています。
低コストで高度な機能が実現できるため、これを使わない手はないでしょう。

どのような機能が実現できるかのイメージを掴むため、このような公式のイベントに参加してみてはいかがでしょうか。

おまけ

ハウテレビジョンでは、AWSやGCPの好きな仲間を募集しています。
Webエンジニア
エンジニアインターンシップ

Web Audio API + firebase + React + material-uiでノイズを組み合わせて評価してもらうサービスを作った

弊社ハウテレビジョンでは、週の1日をR&D dayとして、業務と直接関係しない技術を学んでみたり、今まであまり触れてこなかった領域を調べたりしています。

今回はWeb Audio APIを使ったサービスのプロトタイプを作ってみました。
音声処理が必要なWebサービスは限られますが、作ってみると意外に簡単で楽しいので、空いた時間に何か作ってみてはいかがでしょうか。

背景

弊社エンジニアの中で、noisliというWebサービスがにわかに流行りだしました。 https://www.noisli.com/

これが何かというと、「幾つかの環境音をミキシングし、好きなノイズを作って垂れ流せる」というサービスです。
似たようなサービスは幾つかありますが、作った環境音を簡単に共有出来る点が楽しいです。

さて、各人が好き勝手に作った環境音がSlackで共有されていたのですが、「これは良い」「これは無い」という評価がある程度偏っており、ノイズにも良し悪しがあることが分かりました。

そこで、今回はWeb Audio APIを使って、作ったノイズの評価をリアルタイムで閲覧できるようなサービスを作ってみます。
ここではプロトタイプとして、実運用で考慮すべき諸々は省いています。

開発の手順

  1. 環境構築
  2. Web Audio APIで単一サウンドを再生
  3. 複数サウンドを合成
  4. UIの作成
  5. 再生、評価できるサービスをfirebaseで作る

1はReact、2〜3はWeb Audio API、4はmaterial-ui、5はfirebaseがそれぞれ主なトピックです。

では順番に見てゆきます。

環境構築

f:id:itamisky:20170512133022p:plain

まずはサービスを作成できる環境を整えます。
今回はホビーなプロジェクトですので、以下のコマンドで最小限のReactアプリの土台を作ります。
$ create-react-app noize

Web Audio APIで単一サウンドを再生

Web Audio APIはMDNにドキュメントがありますので、まずそちらを読み進めてゆきます。
https://developer.mozilla.org/ja/docs/Web/API/Web_Audio_API
https://developer.mozilla.org/en-US/docs/Web/API/Web_Audio_API/Basic_concepts_behind_Web_Audio_API
https://developer.mozilla.org/ja/docs/Web/API/Web_Audio_API/Using_Web_Audio_API

Audio contextやBufferなどの基本概念に軽く触れた後は、サンプルコードを見つつ動かしてみます。

まずはこちらのコードを利用し、ランダムなノイズを生成し2秒間流してみます。
https://developer.mozilla.org/en-US/docs/Web/API/AudioContext/createBufferSource

componentDidMount() {
  const audioCtx = new (window.AudioContext || window.webkitAudioContext)();

  const channels = 2;
  const frameCount = audioCtx.sampleRate * 2.0;
  const buffer = audioCtx.createBuffer(channels, frameCount, audioCtx.sampleRate);

  for (let channel = 0; channel < channels; channel++) {
    const nowBuffering = buffer.getChannelData(channel);
    for (let i = 0; i < frameCount; i++) {
      nowBuffering[i] = Math.random() * 2 - 1;
    }
  }

  const source = audioCtx.createBufferSource();
  source.buffer = buffer;
  source.connect(audioCtx.destination);
  source.start();
}

createBufferで2秒間分のバッファを作り、そこにランダムな値を放り込んでゆきます。
作ったバッファは出力にAudioContext.destinationを繋ぎます。

これでページをロード(正確にはDOMをマウント)した際にノイズが流れるようになりました。テロですね。

音量調整

このままでは音が大きいので、全体の音量を調整してみます。 GainNodeをsoruceとdestinationの間に挟み、加工するイメージです。 https://developer.mozilla.org/en-US/docs/Web/API/GainNode

// set volume
const gainNode = audioCtx.createGain();
gainNode.gain.value = 0.05;

const source = audioCtx.createBufferSource();
source.buffer = buffer;
source.connect(gainNode);
gainNode.connect(audioCtx.destination);
source.start();

これで小さな音量で再生することができました。 valueは環境にあわせて設定してください。

ファイルを再生する

ずっとランダムなノイズを聞いていいると精神が疲弊してきますので、環境音を再生させてみます。
AudioContext.decodeAudioData() を利用するとファイルからの非同期読み込みが行えます。
https://developer.mozilla.org/en-US/docs/Web/API/AudioContext/decodeAudioData

読み込みはXMLHttpRequestFileReader のいずれかを用いて行えます。
今回はXHRでリソースを取得します。

windy.wav というwavファイルを用意し、public/sound 以下に配置します。
それをXHRで取得してオーディオバッファに渡し、以前と同じ流れで再生をさせます。
再生部分はplayという関数にまとめています。
ちなみに、source.loop = true; を付けるだけでループ再生してくれます。

componentDidMount() {
  const audioCtx = new (window.AudioContext || window.webkitAudioContext)();

  const request = new XMLHttpRequest();
  request.open('GET', 'sound/windy.wav', true);
  request.responseType = 'arraybuffer';
  request.onload = () => {
    const audioData = request.response;
    const source = audioCtx.createBufferSource();
    audioCtx.decodeAudioData(audioData).then((decodedData) => {
      source.buffer = decodedData;
      this.play(audioCtx, source);
      source.loop = true;
    });
  };

  request.send();
}

play(audioCtx, source) {
  // set volume
  const gainNode = audioCtx.createGain();
  gainNode.gain.value = 0.1;

  // connection
  source.connect(gainNode);
  gainNode.connect(audioCtx.destination);

  source.start();
}

再生・停止できるようにする

現在はロード時に強制的に再生するようにしていますが、これをボタンで再生・停止できるようにしてみます。
必要な処理として、以下が挙げられます。

  • 読み込んだオーディオデータを保存して再利用する
  • 再生・停止を切り替える処理を作る
  • ボタンを用意しonClickで切り替える

今回は再生位置は関係ない(再生する度最初から始まる)ので、再生位置の保存などは必要ありませんでした。

まず、オーディオデータを雑にthisに保存するよう変更します。

this.audioData = {
  windy: null,
}
...
request.onload = () => {
  const audioData = request.response;
  this.audioCtx.decodeAudioData(audioData).then((decodedData) => {
    this.audioData.windy = decodedData;
  });
};
...

次に、再生ボタンが押されたら、保存済みデータでsourceを作成する処理を作ります。

start() {
  this.source = this.connect(this.audioData.windy);
  this.source.start();
}


connect(data) {
  const source = this.audioCtx.createBufferSource();

  source.buffer = data;
  source.loop = true;

  // set volume
  const gainNode = this.audioCtx.createGain();
  gainNode.gain.value = 0.1;

  // connection
  source.connect(gainNode);
  gainNode.connect(this.audioCtx.destination);

  return source;
}

また、停止する処理も作成します。

stop() {
  if (this.source) {
    this.source.stop();
  }
}

これらを切り替えて表示にも反映されるよう、toggle関数を作ってonClick時に発火するようにします。

render() {
  const toggle = () => {
    this.state.playing ? this.stop() : this.start();
    this.setState({playing: !this.state.playing});
  };

  return (
    <div className="App">
      <div>{this.state.playing ? 'now playing...' : 'stopped'}</div>
      <button onClick={toggle}>Toggle</button>
    </div>
  );
}

完成です!実際に切り替わるかを試してみてください。

複数サウンドを合成

今までは単一のサウンドファイルを再生してきましたが、複数のサウンドを読み込み、合成して出力してみます。
グラフなので、ノードとエッジを増やすだけで同じように処理出来る気がしますね。

複数ファイルの読み込み

まずはオーディオデータ読み込みを複数ファイルに対応します。
ファイル名を指定してXHRの結果をキャッシュする処理を括りだし、それをファイル名のループで呼び出します。

loadAudioData(fileName, extension) {
  const request = new XMLHttpRequest();
  const soundDir = '/sound/';
  request.open('GET', `${soundDir}${fileName}.${extension}`, true);
  request.responseType = 'arraybuffer';

  request.onload = () => {
    const audioData = request.response;
    this.audioCtx.decodeAudioData(audioData).then((decodedData) => {
      this.audioData[fileName] = decodedData;
    });
  };

  request.send();
}

componentDidMount() {
  this.audioCtx = new (window.AudioContext || window.webkitAudioContext)();
  ['windy', 'rain'].forEach((name) => this.loadAudioData(name, 'wav'));
}

これで、複数のオーディオファイルデータを簡単に読み込めるようになりました。

サウンドの合成

DTMをやったことがある方には馴染みのある現象かと思いますが、複数の音源から好き勝手に音をだすと、一定のレベルを超えた所で音がビリビリに割れてしまいます。
そんな時はコンプレッサーと呼ばれるものを挟み、大きな音を絞って調整します。

Web Audio APIにもDynamicsCompressorNode というものが用意されており、同じような処理が可能です。
https://developer.mozilla.org/en-US/docs/Web/API/DynamicsCompressorNode

そのため、作成するグラフは、「複数のsource」→「コンプレッサー」→「全体のゲイン(音量)調整」→「出力」という流れになります。

実装としては、まず汎用的な処理にするため、sourceを作成する以下の処理を関数として括りだします。

makeSource(data) {
  const source = this.audioCtx.createBufferSource();
  source.buffer = data;
  source.loop = true;
  return source;
}

その後、複数のsourceをまとめて作成・connect・startさせる処理を記述します。

start() {
  const compressor = this.audioCtx.createDynamicsCompressor();
  const gainNode = this.audioCtx.createGain();
  gainNode.gain.value = 0.1;
  const outputNode = this.audioCtx.destination;
  compressor.connect(gainNode);
  gainNode.connect(outputNode);

  ['windy', 'rain', 'fireplace', 'kiva', 'lavasteam']
    .map((name) => this.makeSource(this.audioData[name]))
    .map((source) => {
      this.sources.push(source);
      source.connect(compressor);
    });

  this.sources.map((source) => source.start());
}

stop() {
  if (this.sources) {
    this.sources.map((source) => source.stop());
    this.sources = [];
  }
}

個々の音源の音量を調整する

個々の音源のバランスを調整するため、それぞれのsourceにもGainNodeをくっつけます。

['windy', 'rain', 'fireplace', 'kiva', 'lavasteam']
  .map((name) => this.makeSource(this.audioData[name]))
  .map((source) => {
    this.sources.push(source);

    const sourceGainNode = this.audioCtx.createGain();
    sourceGainNode.gain.value = 0.3;
    source.connect(sourceGainNode);
    sourceGainNode.connect(compressor);
  });

これで、0.3という固定値を持ったGainNodeを各ソースに紐付けることができました。
後ほど、ここは調整されたパラメータがやってくる予定です。

この段階で、スライダーで全体のゲインを変えられるようにしました(ソースは省略しますが)。
スライダーは範囲を[0, 1000]などにしておき、Gainに渡す際は1/1000すると細かく調整できて便利です。

f:id:itamisky:20170512133055p:plain

UIを作る

さて、このセクションは一休みとして、主にUIを作ります。

リストなどの表示をするのに、通常のulやliをそのまま使うと少し不格好です。
頑張ってCSSを当てても良いですが、ここではMaterial-UIを入れて表示を整えてみます。

今回はSlider、Table、TextField、Buttonの3つのみで事足りました。

http://www.material-ui.com/#/components/slider

http://www.material-ui.com/#/components/table

http://www.material-ui.com/#/components/text-field

http://www.material-ui.com/#/components/flat-button

初期状態の”Welcome to React”だった文字なども変えておきます。
結果、以下のような表示になりました。

f:id:itamisky:20170512133117p:plain

手抜き感はありますが、最低限使えるものにしています。
投票ボタンがキモですが、この段階ではとりあえず置いているだけで、クリックしても何も起きません。

Firebaseとの連携

投票結果を保存して共有するために、サーバーとDBが必要です。
ここではFirebaseの無料プランを使って、お手軽に開発をしてゆきます。

Firebaseの初期化は、コンソールからプロジェクトを作って初期化コードを埋め込むだけの簡単設計です。
https://console.firebase.google.com/?hl=ja

ただし、今回はReactから使うため、NPMとして使います。
https://www.npmjs.com/package/firebase

yarn add firebase -S

DBへの保存

そのままではDBに書き込むのにログインが必要なため、とりあえず全開放のルールを設定します。
FirebaseコンソールのDatabase以下、ルールタブから設定できます。

{
  "rules": {
    ".read": true,
    ".write": true
  }
}

もちろん、プロトタイプ以外では、しっかりとルールを設定しましょう。
全ての情報を誰でも閲覧・編集できてしまうため極めて危険です。

f:id:itamisky:20170512133354p:plain

パラメータとそれに応じた評価を格納すれば良いので、以下のような形式で保存してみます。

/
  - items
    - $id
      - params
        - windy: 0.2,
        - rain: 0.3,
        ...
      - score: 8

認証が無いためシンプルです。
itemの変更を検知するので、ユーザー数が増えそうならもう少し工夫をする必要がありますが、今回はプロトタイプなので手を抜いています。
実際にデータが入ると以下のようになります。

f:id:itamisky:20170512133138p:plain

DBにノイズのデータを書き込む

ユーザーが好きなパラメーターを投稿できるようにします。
まずはこのように適当なフォームを作り、入力を受け取れるようにします。

f:id:itamisky:20170512133156p:plain

“CREATE”ボタンが押されたら、フォームの値を取得し、合計が1.0の時のみデータベースに送信します。

const sum = Object.keys(this.state.values).reduce((result, key) => result + parseFloat(this.state.values[key]), 0.0);
if(sum !== 1.0) {
  this.setState({ message: `sum must be 1.0  - now: ${sum}` });
}
else {
  this.props.sendToDatabase(this.state.values);
  this.setState({ message: 'submitted!' });
}

送信処理は親から渡されており、子コンポーネントはそれにパラメーターを渡して発火するだけです。
送信処理はこれだけです。簡単ですね。

sendToDatabase = (params) => {
  this.db.ref('/items').push().set({
    params: params,
    score: 0,
  });
};

DBからノイズのパラメータを取得する

上記のような処理でデータが詰められてゆきますので、適切に取得する必要があります。
firebaseではDB上のパスを指定し、継続的に変更を検知することができます。
/items の値を検知するには以下のようにコールバックを指定します。

this.db.ref('/items').orderByChild('score').on('value', (snapshot) => {
  const items = snapshot.val();
  this.setState({
    items: items,
  })
})

orderByXXXで特定の値によりソートをすることができます。
なお、indexやlimitなどを考慮する必要がありますが、今回はプロトタイプなので大量のデータを受け取るのも許容してしまいます。

評価を書き込む

評価を変更する箇所ではトランザクション処理にして、複数人が同時に評価してもデータが破損しないようにします。
https://firebase.google.com/docs/database/server/save-data?hl=ja#section-transactions

this.db.ref(`/item/${this.state.playing}/score`).transaction(function (current_value) {
  return (current_value || 0) + value;
});

デプロイする

さて、一応機能ができた所で、デプロイして使ってもらいます。
Firebaseのホスティング機能、create-react-appのbuild機能により、以下のように簡単にデプロイすることができます。
https://firebase.google.com/docs/hosting/deploying?hl=ja

$ firebase init
(hosting.publicをbuildに書き換え)
$ firebase use --add
$ yarn build
$ firebase deploy

database.rulesができた場合は、以前ルールを書き換えた場合と同様に書き換えておきます。

完成

f:id:itamisky:20170512133216p:plain

f:id:itamisky:20170512133232p:plain

まとめ

今回はWeb Audio APIを使い、ちょっとしたサービスのプロトタイプを作ってみました。
create-react-appなどのテンプレート、material-ui、そしてFirebaseを使うとこのようなサービスが簡単に組み立てられますので、プロトタイピングにはオススメです。

Cloud Dataflow入門〜データ処理の実践

弊社ハウテレビジョンでは、週の1日をR&D dayとして、業務と直接関係しない技術を学んでみたり、今まであまり触れてこなかった領域を調べたりしています。

今回はCloud Dataflowに入門し、簡単なデータの分析コードを組み、動かしてみました。
とても簡単に強力な並列処理が出来るので、ログの分析などで活用できそうです。

f:id:itamisky:20170419112030j:plain

概略

Cloud Dataflowは、Google Cloudで提供されているデータ処理用のプラットフォームです。
2017/04現在、JavaとPythonのクライアントが提供されております(Pythonはβ版)。

「便利だよ!」という話はよく訊くのですが、実際に使ってみたことはなかったため、これを機に入門してみます。

できたこと

基本的なDataflowの使い方、そのコンセプトを把握しました。
また、簡単なデータ処理として、「Trump氏のツイート」を対象に「1週間ごとのつぶやき数」を集計、ただし「RTは除く」、という3つの条件でカウントをしました。

進めかた

利用方法をまとめてくれている有用なブログ記事もありますが、変化が激しいのとまとまった時間があるため、下記のGoogle公式のドキュメントを使って入門してみます。
https://cloud.google.com/dataflow/docs/

また、せっかくですので、Pythonのクライアントライブラリを使って進めます。

やったこと

クイックスタート

https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python

セットアップ
まずは新規プロジェクトを作り、APIを有効にします。

https://d2mxuefqeaa7sj.cloudfront.net/s_8ACB1D17AA95783CAEC7C335CBA2166A66ED243120AE0FF2F7C766C37CADD651_1492145849905_+2017-04-14+12.06.51.png

なお、どのGCPのプロジェクトでも必要なgcloud コマンドのインストール、認証などは事前に済ませているものとします。
次に、Cloud Storageにバケットを作成します。

今回は、プロジェクト名が dataflow-research 、バケット名が st-dataflow-research として進めます。
また、現状Pythonのバージョンは2.7のみ使えますので、pyenvで固定しておきます。

必要なコマンドはpipになっているため、簡単にインストールできます。
pip install google-cloud-dataflow

動作確認
一応確認しておきます。
ローカルでの実行、リモートでの実行のそれぞれが載っているので、両方試しておくと無難です。

さて、公式ドキュメントどおりリモート実行を下記コマンドで行うとエラーが出ます(2017/04現在)。

python -m apache_beam.examples.wordcount \
  --project $PROJECT \
  --job_name $PROJECT-wordcount \
  --runner BlockingDataflowPipelineRunner \
  --staging_location $BUCKET/staging \
  --temp_location $BUCKET/temp \
  --output $BUCKET/output

エラーメッセージはこの通り。
ValueError: Unexpected pipeline runner: BlockingDataflowPipelineRunner. Valid values are DirectRunner, EagerRunner, DataflowRunner, TestDataflowRunner or the fully qualified name of a PipelineRunner subclass.

BlockingDataflowPipelineRunner が無いため、DataflowRunner に書き換えて実行します。
正常に実行されると5分ほどかかり、操作が完了します。
ステップごとに実行している様子はGCP上で確認できます。

https://d2mxuefqeaa7sj.cloudfront.net/s_8ACB1D17AA95783CAEC7C335CBA2166A66ED243120AE0FF2F7C766C37CADD651_1492145861146_+2017-04-14+13.56.03.png

Storageに結果が出力されていることも確認しておくと安心です。

パイプラインについて

サンプルが実行できたので、次にコンセプトを学んでゆきました。

公式ドキュメントが充実していますので、重要な所を拾い読みすると捗ります。
https://cloud.google.com/dataflow/model/pipelines

パイプラインはステップの有向グラフ と書かれており、分岐やループが出来ることが併せて述べられています。

変換の項目を見ると、パイプライン(中身はapache beam)は関数型言語で書くような感覚で記述すれば良いことがわかります。
実際サンプルで挙げられているコードも、以下のようにmapやflatMapなどをメソッドチェーン的に呼び出しており、見慣れた形になっています。

(p
 | beam.io.ReadFromText(my_options.input)
 | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | beam.Map(lambda x: (x, 1))
 | beam.combiners.Count.PerKey()
 | beam.io.WriteToText(my_options.output))

サンプルを実行するとまたしてもエラーが出ます。
NameError: name 'argv' is not defined
これは単純にサンプルコードでimportが抜けている問題なので、 以下のように修正します。

@@ -1,3 +1,4 @@
+import sys
 import re

 import apache_beam as beam
@@ -17,7 +18,7 @@ class MyOptions(PipelineOptions):
                         required=True,
                         help='Output file to write results to.')

-pipeline_options = PipelineOptions(argv)
+pipeline_options = PipelineOptions(sys.argv)
 my_options = pipeline_options.view_as(MyOptions)

実行してみると、前回と同様の結果が得られました。

実践

https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline

パイプラインのコンセプトをほんのりと理解したので、その周辺の理解を深めるため、実際にデータ処理を行ってみます。
今回は簡単なデータ処理として、「Trump氏のツイート」を対象に「1週間ごとのつぶやき数」を集計、ただし「RTは除く」、という3つの条件でカウントをします。

ちなみに、以下が対象の「本物」なTrump氏のTwitterです。
https://twitter.com/realDonaldTrump

データとしてツイートの一覧が必要なので、tweepyでさっくりと取得しました。
全ツイートは制限により取れないため、直近の約3000ツイート程度です。

import tweepy
import pandas

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

data = []

def process_status(status):
    data.append([status.created_at, status.text.replace('\n', ' ')])

page = 1
while True:
    print("fetch page {0}".format(page))
    if len(data) > 0:
        print("  top: {0}".format(data[-1]))
    statuses = api.user_timeline('@realDonaldTrump', count=200, page=page)
    if statuses:
        for status in statuses:
            process_status(status)
    else:
        # All done
        break
    page += 1  # next page

df = pandas.DataFrame(data, columns=['created_at', 'text'])
df.to_csv('trump.tsv', sep='\t', index=False, encoding='utf-8')

ローカルで実行

毎回リモートで実行するのは時間がかかるので、ローカルでパイプライン処理を実行させます。
RunnerにDirectRunner を指定するだけでローカル実行されます。
処理がすぐ返ってくるので「実行されてない?」と思ってしまいますが、バックグラウンドで動いています。
しばらくすると結果が同じように出力されます。

パイプラインの組み立て

目的の処理を実現するため、今回は以下の流れで処理をしてゆきます。

  1. TSVファイルの読み込み
  2. RTのツイートを除外
  3. 各ツイートのタイムスタンプを修正する
  4. ウィンドウで区切る
  5. 集計用にデータを変換する(ウィンドウ番号をkeyとして持つ)
  6. カウント(groupBy + count)
  7. 結果を出力する

では、パイプラインのステップを順番に見てゆきます。

TSVファイルの読み込み
これは前のサンプルでもあった通り、beam.io.ReadFromText で読み込めます。
| beam.io.ReadFromText(my_options.input)

RTのツイートを除外
これはフィルタリングするだけで事足りそうです。
ツイート内容にRT @ が入っていれば除外とします。
beam.Filter というプロセッサがあるので、これをありがたく利用します。
| beam.Filter(``**lambda** x: 'RT @' **not in** x)

各行のタイムスタンプを設定する
BigQueryやStorageなどからではない、テキストから読み込んだファイルなどは、全ての行が共通のタイムスタンプを持っています。
データとして持っているものを自動で認識してくれる訳ではありませんので、自分で設定してあげる必要があります。
https://cloud.google.com/dataflow/model/windowing#TimeStamping

JavaではParDoという変換が用意されていますが、PythonではさらにMapとFlatmapが用意されています。
https://cloud.google.com/dataflow/model/par-do

さて、Pythonのサンプルは提供されていませんが、Githubのサンプルを漁った限りでは以下のようにwindow.TimestampedValue を使って出力をすれば良さそうです。
https://github.com/apache/beam/blob/d2b8b2886ce42f138b634d90208780bdce7e058e/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py#L68

まずは、入力データが以下のような\t で区切られたTSV形式になっているので、日時と内容を分離します。
2017-04-13 19:21:07 It was a great honor to welcome Atlanta's heroic first responders to the White House this afternoon! https://t.co/ZtC14AJ0xs
次に、タイムスタンプが日付形式になっているので、Unixタイムスタンプに変換します。
結果をwindow.TimestampedValue で返してあげれば、DoFnの完成です。

class ExtractUserAndTimestampDoFn(beam.DoFn):
    def process(self, context):
        import time
        from datetime import datetime
        [str_timestamp, text] = context.split('\t')
        timestamp = datetime.strptime(str_timestamp, '%Y-%m-%d %H:%M:%S')
        unixtime = int(time.mktime(timestamp.timetuple()))
        yield window.TimestampedValue(text, unixtime)

importを関数内で行っている ことに注意です。
グローバルに読み込むと、各関数が別のマシンで実行された際にimportがされておらずエラーになってしまいます。
https://cloud.google.com/dataflow/faq#nameerror-

作ったクラスはParDoで適用すればOKです。

| beam.ParDo(ExtractUserAndTimestampDoFn())

ウィンドウで区切る
タイムスタンプを一定の区間で区切るものとして、ウィンドウという仕組みがあります。
https://cloud.google.com/dataflow/model/windowing
データ集計ではよく出てくる方法ですので、標準で用意されていて便利です。

ウィンドウで区切るには、WindowInto という関数を利用します。
これも公式ドキュメントには無いので、Githubのサンプルから漁ると見つかります。

from apache_beam.transforms import WindowInto
ONE_WEEK_IN_SECONDS = 60 * 60 * 24 * 7
| WindowInto(FixedWindows(size=ONE_WEEK_IN_SECONDS))

集計用にデータを変換する
ウィンドウで区切ったデータを、(window-number,1) という形に変換し、groupByが出来るようにします。
これは以下のようなクラスを作り、いつも通りにParDoで適用します。

class WithWindow(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
      yield (str(window), 1)


| beam.ParDo(WithWindow())

カウントと出力
これは標準で提供されているプロセッサを適用させれば完了です。

| beam.combiners.Count.PerKey()
| beam.io.WriteToText(my_options.output))

処理のまとめ
処理部分のソースは以下のようになりました。

class ExtractUserAndTimestampDoFn(beam.DoFn):
    def process(self, context):
        [str_timestamp, text] = context.split('\t')
        timestamp = datetime.strptime(str_timestamp, '%Y-%m-%d %H:%M:%S')
        unixtime = int(time.mktime(timestamp.timetuple()))
        yield window.TimestampedValue(text, unixtime)


class SessionsToStringsDoFn(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    yield u"{0}: {1}".format(str(element).encode('utf-8'), str(window).encode('utf-8'))


class WithWindow(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
      yield (str(window), 1)

(p
 | beam.io.ReadFromText(my_options.input)
 | beam.Filter(lambda x: 'RT @' not in x)
 | beam.ParDo(ExtractUserAndTimestampDoFn())
 | WindowInto(FixedWindows(size=ONE_WEEK_IN_SECONDS))
 | beam.ParDo(WithWindow())
 | beam.combiners.Count.PerKey()
 | beam.io.WriteToText(my_options.output))

各ステップでやることがはっきりしているので、分かりやすいですね。

結果
実行してみると、以下のような出力が得られました。

('[1479945600.0, 1480550400.0)', 39)
('[1461801600.0, 1462406400.0)', 60)
('[1480550400.0, 1481155200.0)', 35)
('[1476316800.0, 1476921600.0)', 160)
('[1466640000.0, 1467244800.0)', 78)
...
('[1484179200.0, 1484784000.0)', 47)
('[1483574400.0, 1484179200.0)', 51)
('[1490832000.0, 1491436800.0)', 38)

もちろん、数の代わりに平均を出したり、ひと月あたりに変換したり、ということがすぐできます。

リモートで実行

ソースとなるツイートのデータはStorage上にある必要がありますので、gs://st-dataflow-research/tweet_trump.tsv というファイル名で保存したとします。

さて、このまま意気揚々とリモートで実行をするとコケてしまいます。
これは、例えばdatetimeなど、グローバルでimportしているパッケージをParDoなどで使用しているためです。
ParDo内で実行された関数は、並列に別のマシンで実行されるため、importしていない事になってしまいます。
https://cloud.google.com/dataflow/faq#nameerror-
これを防ぐため、--save_main_session オプションを利用します。

他は同じように、リモートで実行するよう指示を出します。

PROJECT=dataflow-research
BUCKET=gs://st-dataflow-research

python pipeline-tweet.py \
  --project $PROJECT \
  --job_name $PROJECT-tweet-trump \
  --runner DataflowRunner \
  --staging_location $BUCKET/staging \
  --temp_location $BUCKET/temp \
  --output $BUCKET/output \
  --save_main_session

しばらくすると、ローカルで実行したものと同じ結果が出力されました!

まとめ

Dataflowに入門し、軽いデータ処理が行えるようになりました。
並列性の高い処理が簡潔に記述して実行できるため、特に大規模なデータを扱う際には重宝しそうです。

また、軽い気持ちでPythonクライアントを使うと辛いことになるということがわかりました。
ドキュメントが充実していないのはしょうがないにしても、重要な機能であるストリーミング実行がJavaでしか対応していないなど、まだまだ未対応なので使う際は強い意思を持ってご利用ください。

なお、ドキュメントがおかしい時はフィードバックを送ってみましょう。
上記で問題になっていた箇所には既にフィードバックを送っているので、もしかしたら直っているかもしれません。

Rustに入門した理由、チュートリアルの過程と感想

弊社ハウテレビジョンでは、週の1日をR&D dayとして、業務と直接関係しない技術を学んでみたり、今まであまり触れてこなかった領域を調べたりしています。

今回は複数人で集まってRustのチュートリアルを読み進め、実際に簡単なコードを組み、動かしてみました。
結果として、今から入門するのに向いた言語で、特に今まであまり多くの言語を触ったことのない方にお勧めできる、ということが分かりました。

f:id:itamisky:20170412104405p:plain

なぜRust?

Rustでは、通常のWebアプリケーション開発ではあまり気にすることのない、低レベルな処理を学べます。
また、いわゆる関数型言語の特徴を取り入れているためそのパラダイムを学んだり、静的型付けに慣れることができます。

C++やHaskellといった言語を触ったことのある人にはお馴染みですが、RubyやPythonしか触ったことがないというメンバーもいますので、これを気にまとめて学んでしまおうという寸法です。

利点をまとめると以下になります。

  • メモリの状態を意識できるようになる
  • 静的型付けに親しめる
  • 様々なパラダイムに触れることができる(特に関数型)
  • WebAssemblyで使えるかもしれない
  • 最近の話題についていける

このように多くの利点がある便利な言語ですので、Rustを選択しました。

以下では実際にチュートリアル(+ 一部リファレンス)を読んだ過程と、入門をおすすめする理由を記載してゆきます。

やったこと

Rustの公式チュートリアルを読み進め、実際にサンプルを動かしたり、改変したりしました。
公式ドキュメント https://doc.rust-lang.org/book/ を読んで進めてゆきました。

導入

まず、Rustの言語思想が述べられています。
「安全性」「速度」「並列性」をガベージコレクタ無しで実現するとのことで、肝になるメモリ管理をどうするのか、幾つか推測が話されました。

なお、組み込み環境が利用目的に入っているためか、対応プラットフォーム一覧がとても充実しています。
https://forge.rust-lang.org/platform-support.html

環境構築は、全員Macユーザーなので、brew install rust で簡単に導入ができました。

Hello World!

単純なプログラムですが、なかなか重要な情報が詰まっていました。
println!は関数ではなくマクロだということや、インデント・スペースの規約がさらりと述べられていたりします。

コンパイラの説明も、Rubyなどの動的言語しか触ったことがない人のため懇切丁寧に書かれており、入門に向いています。

続いて、ビルドツールであるCargoの説明が入ります。
cargo runcargo new といった便利なコマンドが紹介されました。
公式で用意されており、rails new の便利さに慣れてしまった我々も安心の親切設計です。

なお、作成されたtarget ディレクトリの中を覗いてみましたが、ディレクトリ名を見ただけでそれがどのようなものか分かる、綺麗な構造になっていました。
「依存関係にあるライブラリはここに入る」
「debugとreleaseビルドがある」
「インクリメンタルビルドをする」
などの後々説明が入る特徴もディレクトリ名を見るだけでおおよそ推測でき、ツールへの安心感が芽生えました。

数あてゲームのチュートリアル

ライブラリの読み込みや変数束縛やimmutabilityなどの説明が入り、いよいよコーディングに入ります。
紹介されているのはおおよそ他の言語でおなじみな機能ですが、懇切丁寧に説明されているので初見でも理解が進みます。

一部、パターンマッチや所有権の話も出てきますので、関数型やC++に触ったことが無いと少し理解が大変でした。

所有権はサンプルコードを書き換えて、内部の仕組みを推測しながら進めました。
「コピー渡しなのか、参照渡しなのか」の判別は少し疑問が残りましたが、後ほどCopy traitや所有権の詳細ページで出てきて氷解しますので、とりあえず先に進むことをおすすめします。

基本的な所を見てゆき、読み終わった後は簡単なプログラム(FizzBuzzなど)を各自作成しました。

面白かった点
色々いじってみた中で、以下のコードが生まれました。
後々ドキュメントを読み進めてゆくと出てくるのかと思いますが、挙動が面白かったです。

// plus_twoとして、plus_oneと同じ型の関数を用意する

let mut f: fn(i32) -> i32 = plus_one;
f = plus_two;  // ok!

let mut f = plus_one;
f = plus_two;  // error!

実際に動かすとコンパイルエラーが出るので、そのメッセージを見ると原因が分かります。
是非試してみてください。

チュートリアルの後

ここからは、各機能の詳細ページです。
「4.1 Variable Bindings」〜「4.8 Ownership」まで読み進めました。
特にOwnershipのページは今までの謎が解ける感覚がありますので、Rustの入門としては外せないページでした。
メモリ管理関連のページは3つに分かれており、残り2つも読むことが強く推奨されています。

是非次回は続きから読み進めたいところです。

言語周辺で良いと思ったところ

チュートリアルを終えて、言語の周辺で良いと思った点をまとめます。

  • ドキュメントが面白い & 懇切丁寧
    • 「その説明、いる?」というレベルの丁寧さ
    • ちょくちょく茶番が入る
  • インストールが楽
  • 便利なビルドツールが用意されている
    • ディレクトリ構造が分かりやすい
    • 新規プロジェクト用のコマンドが用意されている

言語の強力さ」「開始のお手軽さ」「ドキュメントの丁寧さ」を兼ね揃えており、これから学習するのに向いている言語だと感じました。

進め方についての感想

新しい言語を使うのに際し、リファレンス的にドキュメントを参照して学習を進めることもありますが、今回はドキュメントに沿って進めたため以下のような感想が挙がりました。

“新しい言語を学習する際に、チュートリアルを丁寧になぞるという事をあまりしないので、よい経験になった。複数人で読み合わせすることにより、理解があいまいな箇所、解説の意味・意図を勘違いしてしまうことを防げたと思う。” (20代 / 男性)

一方で、全員で同じドキュメントを読み進めたのみでしたので、以下の改善案も挙がっています。

“皆で同じことをやるのも良いが、アウトプットが似たり寄ったりになってしまうので、次はもくもくタイムと簡単な発表の時間を設けても良いかもしれない。” (20代 / 男性)

そのため、ある程度の時間を確保し、自習や好きなものを作ってみる時間を取るのが良さそうです。
時間を見積もるのは難しいですが、お題を先に決めておいて、それをどのように実装したか、発表とレビューを行うのも良いと思いました。

同じようにドキュメントを読まれる際は、是非参考にしていただければと思います。

まとめ

今回はRustのチュートリアルを利用し、入門してみました。

今までRailsやJSだけ書いてきた、という方にもお勧めな入門しやすい言語だと思います。
他の言語を学んでみたい場合のご参考になれば幸いです。

おまけ
ハウテレビジョンでは、プログラミング言語の好きな仲間を募集しています。
Webエンジニア
エンジニアインターンシップ

ICSE 2017 論文リーディング

弊社ハウテレビジョンでは、週の1日をR&D dayとして、業務と直接関係しない技術を学んでみたり、今まであまり触れてこなかった領域を調べたりしています。

f:id:itamisky:20170403103601j:plain

背景

最先端の研究を知るのに、カンファレンスの論文を読むのは有効な手段です。
直接役立つことは多くないですが、理論を応用できたり、今後どんな技術が出てくるのかを知ることができます。
何より楽しいです。

そこで今回は、ソフトウェア工学系のトップカンファレンスであるICSE(International Conference of Software Engineering)の論文をざっくり読んでゆきます。

折角なので、まだ開催されていない(2017/05月開催)ICSE 2017の論文を読みます。
まだなのにどうやって読むの?と思いますが、Preprintsとして事前に論文を公開してくれている場合があるので、それを利用します。
公式Twitterによると、おおよそ66%の論文がpreprintsとして集まっているようです。すごい。
https://twitter.com/ICSEconf/status/841892455809187840

preprintsはこちらに掲載されていますので、原文を読みたい方はこちらから辿ってください。
https://docs.google.com/spreadsheets/d/19rjBeNklsfFdggZ7Xj2h1oNpIXfZuR-hYrSQrk4xCfc/edit#gid=1276835202

注意

  • ICSEの全論文ではありません
    • 9つだけ
    • Technical Research Papersの中で、preprintsが公開されているもの
  • タイトルとアブストだけ読むため、手法の詳細には触れません

論文内容まとめ

Analyzing APIs Documentation and Code to Detect Directive Defects

http://www.zora.uzh.ch/134450/1/YU-ICSE.pdf

著者: Yu Zhou, Ruihang Gu, Taolue Chen, Zhiqiu Huang, Sebastiano Panichella and Harald Gall(南京大学・ロンドン大学・チューリッヒ大学)

概要の大雑把な訳:
APIドキュメントは開発者にとって最も重要なドキュメントだが、よくソースコードとずれが生じており、開発の効率や品質に関わる問題になっている。
本論文では、ソースコード理解と自然言語処理を利用し、自動でこのずれを検出する方法を提案している。
手法としては、引数の制約(NonNull、型など)と吐く可能性のある例外に関するAPIドキュメントの記述を利用する。
この内容を用いて、制約ソルバを使ってずれを検出する。
JDK1.8のAPIに対して実験し、2000 APIの中から1146個のずれを検出した。
プレシジョンは81.6%、リコールは82.0%となり、実際に使えることを示した。

感想: ドキュメントとずれている状態はよく直面する。ぜひとも早く普及して解消されて欲しい。ドキュメント中に「ずれてますよ」というマークがあるだけでも便利そう。

An Unsupervised Approach for Discovering Relevant Tutorial Fragments for APIs

http://oscar-lab.org/paper/icse_17_frapt.pdf

著者: He Jiang, Jingxuan Zhang, Zhilei Ren and Tao Zhang(大連大学、遼寧省の研究所、武漢大学、ハルビン工程大学)

概要の大雑把な訳:
開発者はどんどんAPIチュートリアルに頼るようになっている。
しかし、不慣れなAPIの中からチュートリアルのコード片を探すのは未だに困難である。
既存手法では、大量にコーパスを手動で作る必要があり、悩まされていた。
本論文では、FRAPT(APIをページランクとトピックによりコード片をリコメンドする)というアプローチを取り、自動で行えるようにしている。
FRAPTでは、APIに関連するチュートリアルとなるコード片を決定する上で生じる2つの課題に挑戦している。
1つ目は、「代名詞と変数の解像度問題」として、チュートリアルとなるコード片の中からAPIを特定し、抽象的な代名詞と変数をAPI名や即した文脈に置き換える、という問題。
2つ目は、「説明にならないコード片の検出問題」として、不要なコード片を除去するフィルターを作成する問題。
これらの問題を解決し、トピックモデルとページランクアルゴリズムの両方を適用することで相関係数の算出と関連するコード片の集約ができた。
オープンなデータに対して本手法を適用し、既存手法に比べF値で8.77%→12.32%に精度が向上した。
加えて、本手法の核となるコンポーネントの効率性も検証した。

感想: 既存手法でも同様かもしれないが、「開発者はAPIドキュメントのチュートリアルに頼っている」という切り口が良い。チュートリアルはAPIの習熟に大きく関わる要因なので、こちらも実務に直接関わってきそうな内容。

Efficient Detection of Thread Safety Violations via Coverage-Guided Generation of Concurrent Tests

http://mp.binaervarianz.de/icse2017-covcon.pdf

著者: Ankit Choudhary, Shan Lu and Michael Pradel(ダルムシュタット工科大学、シカゴ大学)

概要の大雑把な訳:
並列プログラムを書くのは難しく、困難な問題を隠してくれているスレッドセーフなクラスに頼ることが多い。
これらのクラスのテストは並列プログラムの正確性を担保する上で極めて重要な問題である。
テストの見逃しを減らすため、自動生成した並列テストを作成することは効果的な手法である。
ランダムに作成する既存手法があるが、これは計算コストが高い解析に依存していたり、特定のバグに依存していたりする(アトミック性など)。
本論文では、CovConというカバレッジでガイドしてテストを作成する手法を提案する。
2つのメソッドをペアにして、それらがどれくらい並列に実行されたかを計測し、その頻度に着目する。
本手法では特定のバグのパターンに依存するものではなく、あらゆる並列実行時のバグを検出できる可能性がある。
また、計算コストが高くなく、短時間で多くのテストを生成することが出来る。
本論文ではCovConを18個のスレッドセーフなJavaのクラスに適用し、17のバグを見つけた。
既存の5つの手法に比べ、CovConは短時間でかつより多くのバグを見つけた。
詳細には、38/47のケースでより早くバグを見つけ、22/47のケースでは4倍近い速度で検出をした。

感想: 手法としてはシンプルに見えるが、検出精度も速度も大きく向上しているらしく意外性がある。スレッドセーフなクラスには多くの人が頼っていると思うので、その裏側で日々研究がされているのを見ると面白い。

RClassify: Classifying Race Conditions in Web Applications via Deterministic Replay

http://www-bcf.usc.edu/~wang626/pubDOC/ZhangW17.pdf

著者: Lu Zhang and Chao Wang(バージニア工科大学、南カリフォルニア大学)

概要の大雑把な訳:
競合状態はWebアプリケーションでよく発生するが、原因を突き止めるのが難しく、修復するのも難しい。
既存の競合状態検出ツールはあるが、「ニセの検出(false positive)」が大量に出る。
ニセの検出とは、bogus(実際にはまず起こらない)、benign(エラーを引き起こさない)の両方を示す。
手動でこれらの検出結果を調べることは大変で間違えやすいため、これらの検出結果を提示することは非生産的である。
本論文ではプラットフォームに囚われない、決定性の実行に基づいた手法により、実際に起こり有害な競合状態を特定する。
手法としては、競合するイベントのペアを2つの異なった順番で実行し、プログラムの状態に及ぼす影響を評価する。
ここでは、「有害な競合」を (1)両方のイベント実行が実行完了すること、(2)それぞれが異なった状態を生み出すこと、という両方の条件を満たすものと定義する。
Fortune500の企業で使われている大規模で実際に使われているWebサイトに対して本手法を適用し、既存の手法を大きく上回る効果を示した。

感想: 概要だけでは詳細は分からないが、実際にかけてみやすい類のツールなので、公開されていたら類似ツールも含めて試してみよう。

Repairing Event Race Errors by Controlling Nondeterminism

http://cs.au.dk/~amoeller/papers/eventracecommander/paper.pdf

著者: Christoffer Quist Adamsen, Anders Møller, Rezwana Karim, Manu Sridharan, Frank Tip and Koushik Sen(オーフス大学、米サムスン研究所、ノースイースタン大学、カリフォルニア大学バークレー校)

概要の大雑把な訳:
モダンなWebアプリはイベントドリブンで書かれており、イベントハンドラが非同期にユーザーやシステムのイベントを処理する。
このスタイルでは、非決定性が致命的なエラーを生じさせうる。
近年の研究ではイベント競合とその分類(有害かそうでないか)にフォーカスをしている。
しかし、ソースコードをこれらの競合が起こらないようにするのは難しく間違えやすいため、よくない実行をさせない方が望ましい。
本論文では、JavaScriptで書かれたWebアプリケーションで発生するイベント競合を自動修正する手法を提案する。
この手法では、イベントを後で実行させたり破棄して競合を回避する、というポリシーを適用するため、イベントコントローラーを用意しブラウザ内のイベントハンドラのスケジューリングを制限する。
本手法はEventRaceCommanderというツールにし、Fortune500の中で大きな20のWebアプリケーションに対して適用し、100以上のイベント競合を検出した。
アプリケーションに依存しない修正ポリシーは、パフォーマンスやユーザー体験を大きく損なうこと無く十分にイベント競合を解消したが、特定の修正ポリシーを定めた方が望ましい場合もある。

感想: 1つ前の論文とは別のアプローチで競合状態に挑んでいる。スケジューリングの方法がかなり複雑になりそう。実行時に発生させないようイベント自体をずらすのは面白いが、逆に分かりにくいバグの原因が増えてしまいそう。

TRAVIOLI: A Dynamic Analysis for Detecting Data-Structure Traversals

https://rohanpadhye.github.io/travioli/paper.pdf

著者: Rohan Padhye and Koushik Sen(カリフォルニア大学)

概要の大雑把な訳:
トラバーサル(機械的に一部・全部のデータを辿る処理)はデータ構造に対する最も基礎的な処理である。
本論文ではTRAVIOLIという動的解析でトラバーサルを検知するツールを提案する。
ここでは、ループや再帰の中にある、配列やリスト・木などのトラバーサルを正確に検出することを可能にする、非環(acyclic)な実行コンテキストを導入する。
本論文では、「どのようにTRAVIOLIの実行結果をデータ構造のトラバーサルの可視化に利用するか」「パフォーマンスの劣化はどの程度か」「無駄なトラバーサルによる性能劣化の検出をどうするか」について説明する。
実際に使われている5つのJavaScriptのプログラムに適用し、誤検出は4%以下だった。
検出したうちの93.75%にパフォーマンステストを行った(訳注: 結果は概要にはない)。
また、D3とexpressに性能劣化を検出した。

感想: 面白いテーマ。概要を読む限り、動的解析だとどうしても誤検出が増えそうだが、どのようにして減らしているのかは読み込まないと分からなさそう。

ProEva: Runtime Proactive Performance Evaluation Based on Continuous-Time Markov Chains

http://www.comp.nus.edu.sg/~david/Publications/icse2017-preprint.pdf

著者: Guoxin Su, Taolue Chen, Yuan Feng and David Rosenblum(ウーロンゴン大学、ロンドン大学、シドニー工科大学、シンガポール国立大学)

概要の大雑把な訳:
ソフトウェアシステム、特にサービスは実行時のパフォーマンスが要求される。
もしパフォーマンスが落ちている場合、再設定によって対策をする。
しかし、その対策が反映されるまでしばらくラグがある。
そのため、現在のシステムを受動的に監視することは勿論大事だが、将来のパフォーマンスを予測することも大事である。
連続時間マルコフ連鎖(CTMC)は時間を区切ったパフォーマンスのメトリクスを算出するのに適している(例えば、ある将来の時間でパフォーマンスの劣化がどれくらい起こりうるかを求める)。
CTMCを活かす上で課題となるのが、遷移率などのモデルパラメーターを実行中に計測すること。
これらのパラメーターはシステムや環境によって頻繁に更新されるため、正確な値を与えるのが困難である。
本論文では、既存のCTMCモデルを、遷移率として不正確で区間の中での予測値を許容するよう拡張したProEvaというフレームワークを作った。
ProEvaは不正確なモデルの出力として、漸近的な式と境界を計算することをコアな手法としている。
精度とオーバーヘッドについても触れている。

感想: 数値予測の理論的な論文なので、読むのは骨が折れそう。サービス監視系のツール開発に特に役立つ内容に見える。

Clone Refactoring with Lambda Expressions

https://users.encs.concordia.ca/~nikolaos/publications/ICSE_2017.pdf

著者: Nikolaos Tsantalis, Davood Mazinanian and Shahriar Rostami Dovom(コンコルディア大学)

概要の大雑把な訳:
ラムダ式はJava8で関数型プログラミングをサポートするため導入され、関数を引数として渡すことで「動作のパラメーター化」を実現した。
既存のコードクローンは異なった動作をするものが大部分(Type2、Type3に分類されるもの)である。
しかし、筆者らの知る限りでは、これらのコードクローンをラムダ式を用いて共通化する研究は無かった。
本論文では、ふるまいの違うコードクローンを、ラムダ式を用いてリファクタリングできるかを調べる手法を提案する。
さらに、大規模なコードクローンのデータセットに対し適用し、適用可能性と、リファクタリングに使われたラムダ式の特徴を示している。
結果、他の手法ではリファクタリングできなかった多くのクローンに対して、ラムダ式を使ってリファクタリングができた。

感想: 普段からよくこの手のリファクタリングはするものの、そういえば研究されてるという話を聞かなかった。いい所を突いた感じがある。リファクタリング提案機能が備わっているIDEが増えているが、この機能が搭載されれば開発の強力なサポートになるのでは。期待。

Characterizing and Detecting Anti-patterns in the Logging Code

http://nemo9cby.github.io/resources/pubs/icse2017_chen.pdf

著者: Boyuan Chen and Zhen Ming Jack Jiang(SCALE Lab)

概要の大雑把な訳:
ログのスニペット(Log.infoやSystem.out.printlnなど)は開発者がソフトウェアに埋め込む。
ログコードが増える度に実行時のコンテキストが増えるが、メンテナンスの負担があり多すぎるログコードは望ましくない(訳注: 例えば、ログの中で出力している変数が無くなったり、NPEになったり)。
加えて、パフォーマンスが落ちたり、ディスクI/Oが増えたりする問題もある。
最近の研究では、効果的なロギング方法がしっかりと定められたコーディングガイドラインは無いとされている。
ロギングに関する先行研究の多くは、「どこに」「何を」ログを取るか、に主に取り組んでおり、「どのように」ログを取るか(ロギングコードの開発・保守性)に取り組んだ研究は非常に少ない。
本論文では、ロギングのアンチパターンを特徴づけ、及び検出を行い、「どのように」ログを取るかの問題に取り組む。
ロギングコードの大部分は対象となるコードの進化に伴い変化してゆき、残りはアンチパターンの解消のために修正される。
本研究では、3つのよくメンテナンスされているOSS(ActiveMQ、Hadoop、Maven)に対し352個のロギングコードに対する変更をチェックした。
その結果、6つのアンチパターンが見つかった。
この発見の有用性を示すため、これらのアンチパターンの検知をするLCAnalyzerというツールに実装した。
LCAnalyzerを使った実験では、リコールが95%、プレシジョンが60%でアンチパターンを発見し、未知のアンチパターンの発見にも使えることを示した。
また、フィードバックを得るため、64の代表的な検出結果を10個のOSSで修正して送った所、46個(72%)が(この時点で)マージされた。

感想: 普段開発する中で、たしかにロギング処理はとっ散らかりがち。実際に論文の先を読むと、「Nullable objects」「Wrong verbosity level」など、よくやってしまうものが多い。たかがログのコードと侮らず、ちゃんとメンテナンスしてゆくことが必要だと考えると、アンチパターンという分かりやすい形でまとめてくれているのはとても有用だと思う。

まとめ

まだまだ面白そうな論文がありますが、ここで一旦終わります。
ソフトウェア工学という分野の特性か、上記9つの中でも実用的な内容が多く、実際に使われるイメージが持てるものが多かったです。

ぜひ、上記概要で気になった論文があれば原文を読んでみてください。 また、残りの論文もタイトルを眺めて、面白そうなものを探すと良いかと思います。

おまけ

ハウテレビジョンでは、論文の好きな仲間を募集しています。
Webエンジニア
エンジニアインターンシップ