Amazon API Gateway による WebSocket API の作成

2019年9月23日

はじめに

Amazon API Gateway による WebSocket API の作成について。

環境

  • Windows 7 Professional SP1 64bit
  • Node.js

API Gateway の設定

Amazon API Gateway とは、Web API を作成する仕組みである。裏方には Lambda 関数などが使える。操作にはポリシー AmazonAPIGatewayAdministrator が必要。

試しに "Echo API" を作ってみる。

ベースの作成

  1. AWS マネジメントコンソールで API Gateway を開く。
  2. [今すぐ始める] を押す。
  3. "プロトコルを選択する" で "WebSocket" を選ぶ。
    (2021.1.2 追記) 最近は、一度 "REST API" で構築する必要があるみたい。
  4. "API名" に適当に名前をつける (ここでは "Echo API")。
  5. "ルート選択式" に "$request.body.action" と入れる。
  6. [API の作成] を押す。
  1. Lambda を開き、OnConnect、OnDisconnect、SendMessage 関数をそれぞれ作る。

OnConnect

exports.handler = (event, context, callback) => {
    console.log("OnConnect");

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};

OnDisconnect

exports.handler = (event, context, callback) => {
    console.log("OnDisconnect");

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};

SendMessage

exports.handler = async (event, context, callback) => {
    console.log("SendMessage");

    const data = JSON.parse(event.body).data;
    console.log("data", data);

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};

Lambda のロールに AmazonAPIGatewayInvokeFullAccess をアタッチする。

  1. API Gateway に戻り、"新しいルートキー" に "sendmessage" と入れてチェックボタンをクリック。統合タイプを "Lambda 関数" とし、"Lambda 関数" に "SendMessage" と入れる。[保存] ボタンをクリックすると、"API Gateway に Lambda 関数を呼び出す権限を与えようとしています" と出るので、OK する。保存がなかなか終わらないように見えるかもしれないが、しばらくしてルートキーをクリックして設定画面が出てくれば OK。同様に、$connect、$disconnect それぞれに "OnConnect"、"OnDisconnect" 関数を設定する。
  2. ルートの [アクション] で "API のデプロイ" を選択。"デプロイされるステージ" で "[新しいステージ]" を選び、"ステージ名" に "dev" などと入れる。[デプロイ] ボタンをクリック。
  3. "dev ステージエディター" が表示される。上の方に "WebSocketURL: wss://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev" などとあるが、これが接続先になる。

接続テスト

wscat でテストする。npm でインストール可能。

>npm install -g wscat

テスト。

>wscat -c wss:///xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev
connected (press CTRL+C to quit)
> { "action": "sendmessage", "data": "hello" }

"action" でルートを指定すると、対応する Lambda 関数にデータが渡される。$connect、$disconnect は接続時、接続断時に呼ばれる。上記の実行の結果、OnConnect、OnDisconnect、SendMessage それぞれのログに出力されていれば OK。

Echo の実装

SendMessage

const AWS = require("aws-sdk");

exports.handler = async (event, context, callback) => {
    console.log("SendMessage");

    const agma = new AWS.ApiGatewayManagementApi({
        apiVersion: "2018-11-29",
        endpoint: event.requestContext.domainName + "/" + event.requestContext.stage
    });

    const data = JSON.parse(event.body).data;
    console.log("data", data);

    const connectionId = event.requestContext.connectionId;
    console.log("connectionId", connectionId);

    try {
        await agma.postToConnection({ ConnectionId: connectionId, Data: data }).promise();
    } catch (e) {
        console.log("Error", e);
    }

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};

API Gateway が接続先 ID として connectionId を渡してくるので、それに対してメッセージをそのまま送り返している。

テスト。

>wscat -c wss:///xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev
connected (press CTRL+C to quit)
> { "action": "sendmessage", "data": "hello" }
< hello

HTML からのテスト

echo_api_test.html

<!DOCTYPE html>
<html>
  <head>
    <meta charset="UTF-8">
    <title>Echo API テスト</title>
  </head>
  <body>
    <label for="messageInput">Message: </label><input type="text" id="messageInput">
    <input type="submit" value="Send" id="sendSubmit">

    <p id="output"></p>

    <script>
      const uri = "wss://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev";

      const sendSubmit = document.getElementById("sendSubmit");
      const messageInput = document.getElementById("messageInput");
      const output =  document.getElementById("output");

      function write(msg) {
        output.innerHTML += msg + "<br>";
      }

      function sendMessage() {
        let ws = new WebSocket(uri);
        ws.onopen = function(e) {
          write("Connected");
          const msg = messageInput.value
          write("Send: " + msg);
          ws.send(`{ "action": "sendmessage", "data": "${msg}"}`);
        };

        ws.onmessage = function(e) {
          write("Response: " + e.data);
          ws.close();
        };

        ws.onclose = function(e) {
          write("Disconnected");
        };

        ws.onerror = function(e) {
          write("Error: " + e.data);
        };
      }

      sendSubmit.addEventListener("click", sendMessage);
    </script>
  </body>
</html>

例: チャットルームの実装

チャットルームを実装してみる。部屋 ID を指定して接続し、同じ部屋 ID で接続しているユーザーとだけメッセージをやりとりできるようにする。部屋 ID と接続 ID (connectionId) のペアを DynamoDB のテーブルとして保存しておいて、メッセージを送ってきた接続 ID と同じ部屋 ID と結びついたすべての接続 ID に対してメッセージを送る。

DynamoDB

  • テーブル: ConnectionTable
  • パーティションキー: roomId (String)
  • ソートキー: connectionId (String)

API Gateway/Lambda

入室処理

  • ルート: enterroom
  • Lambda 関数: EnterRoom

EnterRoom

const AWS = require("aws-sdk");
AWS.config.update({ region: process.env.AWS_REGION });
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {
    const connectionId = event.requestContext.connectionId;
    const roomId = JSON.parse(event.body).data;

    console.log("EnterRoom", connectionId, roomId);

    const params = {
        TableName: process.env.TABLE_NAME,
        Item: {
            roomId: roomId,
            connectionId: connectionId
        }
    };

    docClient.put(params, function(err) {
        if (err) {
            console.error("Error", err);
            callback(new Error("Error"));
        } else {
            console.log("Sucess");
        }
    });

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};
  • 環境変数: TABLE_NAME = DynamoDB テーブル名
  • ロールポリシー: AmazonDynamoDBFullAccess

DynamoDB テーブルに部屋 ID と接続 ID のペアを登録する。

退室処理

  • ルート: exitroom
  • Lambda 関数: ExitRoom

ExitRoom

const AWS = require("aws-sdk");
AWS.config.update({ region: process.env.AWS_REGION });
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {
    const connectionId = event.requestContext.connectionId;
    const roomId = JSON.parse(event.body).data;

    console.log("EnterExit", connectionId, roomId);

    const params = {
        TableName: process.env.TABLE_NAME,
        Key: {
            roomId: roomId,
            connectionId: connectionId
        }
    };

    docClient.delete(params, function(err) {
        if (err) {
            console.error("Error", err);
            callback(new Error("Error"));
        } else {
            console.log("Sucess");
        }
    });

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};
  • 環境変数: TABLE_NAME = DynamoDB テーブル名
  • ロールポリシー: AmazonDynamoDBFullAccess

DyamoDB テーブルから部屋 ID と接続 ID のペアを削除する。

メッセージの送信

  • ルート: sendmessage
  • Lambda 関数: SendMessage

SendMessage

const AWS = require("aws-sdk");
AWS.config.update({ region: process.env.AWS_REGION });
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = async (event, context, callback) => {
    console.log("SendMessage");

    const agma = new AWS.ApiGatewayManagementApi({
        apiVersion: "2018-11-29",
        endpoint: event.requestContext.domainName + "/" + event.requestContext.stage
    });

    const msg = JSON.parse(event.body).data;
    console.log("data", msg);

    const connectionId = event.requestContext.connectionId;
    console.log("connectionId", connectionId);

    const params = {
        TableName: process.env.TABLE_NAME,
        ProjectionExpression: "roomId, connectionId",
        FilterExpression: "connectionId = :cid",
        ExpressionAttributeValues: {
            ":cid": connectionId
        }
    };

    let roomId;

    while (roomId === undefined) {
        try {
            const data = await docClient.scan(params).promise();

            console.log("Success");
            roomId = data.Items[0].roomId;
        } catch (e) {
            console.error("Error", e);
        }
    }

    console.log("roomId", roomId);

    const params2 = {
        TableName: process.env.TABLE_NAME,
        ProjectionExpression: "roomId, connectionId",
        KeyConditionExpression: "roomId = :rid",
        ExpressionAttributeValues: {
            ":rid": roomId
        }
    };

    let data;

    try {
        data = await docClient.query(params2).promise();
        console.log("Success");
    } catch (e) {
        console.error("Error", e);
        callback(new Error("Error"));
    }

    const postCalls = data.Items.map(async ({ roomId, connectionId }) => {
        try {
            await agma.postToConnection({ ConnectionId: connectionId, Data: msg }).promise();
        } catch (e) {
            if (e.statusCode === 410) {
                console.log(`Found stale connection, deleting ${connectionId}`);
                await docClient.delete({
                    TableName: process.env.TABLE_NAME,
                    Key: { roomId: roomId, connectionId: connectionId }
                }).promise();
            } else {
                throw e;
            }
        }
    });

    try {
        await Promise.all(postCalls);
    } catch (e) {
        console.log("Error", e);
        callback(new Error("Error"));
    }

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null, response);
};
  • 環境変数: TABLE_NAME = DynamoDB テーブル名
  • ロールポリシー: AmazonDynamoDBFullAccess, AmazonAPIGatewayInvokeFullAccess

メッセージを送ってきた接続 ID で DynamoDB テーブルから scan() で部屋 ID を取得し、その部屋 ID で DynamoDB テーブルから query() で同じ部屋 ID の接続 ID リストを得る。その接続 ID リストのそれぞれに対してメッセージを送る。部屋 ID を取得する部分はループにしてある。入室時にも「入室しました」というメッセージを送るためにこの関数が用いられるが、DynamoDB の更新が間に合わずに部屋 ID の取得に失敗することがあるようで、その対策である。無限ループの形ではあるが、Lambda 関数には時間制限があるので、最悪でもタイムアウトになるだけである。

チャット画面

チャット画面を HTML5/JavaScript で実装する。WebSocket で接続してメッセージを送る。

chat_sample.html

<!DOCTYPE html>
<html>
  <head>
    <meta charset="UTF-8">
    <title>チャットサンプル</title>
  </head>
  <body>
    <h1>チャットサンプル</h1>
    <div>
      <label for="userNameInput">ユーザー名 </label><input type="text" id="userNameInput">
      <label for="roomIdInput">部屋 ID </label><input type="text" id="roomIdInput">
      <input type="button" value="入室" id="enterRoomButton">
      <input type="button" value="退室" id="exitRoomButton">
    </div>
    <div>
      <label for="messageInput">メッセージ </label><input type="text" id="messageInput">
      <input type="button" value="送信" id="sendButton">
    </div>

    <hr>

    <p id="output"></p>

    <script>
const uri = "wss://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev";
let ws = null;

const userNameInput = document.getElementById("userNameInput");
const roomIdInput = document.getElementById("roomIdInput");
const enterRoomButton = document.getElementById("enterRoomButton");
const exitRoomButton = document.getElementById("exitRoomButton");

let userName = "";
let roomId = "";

exitRoomButton.disabled = true;

const messageInput = document.getElementById("messageInput");
const sendButton = document.getElementById("sendButton");

messageInput.disabled = true;
sendButton.disabled = true;

const output =  document.getElementById("output");

function write(msg) {
    output.innerHTML += msg + "<br>";
}

function log(msg) {
    output.innerHTML += "<font color=gray>" + msg + "</font><br>";
}

function enterRoom() {
    ws.send(`{ "action": "enterroom", "data": "${roomId}"}`);
}

function exitRoom() {
    ws.send(`{ "action": "exitroom", "data": "${roomId}"}`);
}

function send(msg) {
    const str = `${userName} (${roomId}): ${encodeURI(msg)}`;
    ws.send(`{ "action": "sendmessage", "data": "${str}"}`);
}

// sleep
// ref. https://stackoverflow.com/questions/951021/what-is-the-javascript-version-of-sleep
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

enterRoomButton.onclick = () => {
    userName = userNameInput.value;
    roomId = roomIdInput.value;

    if (userName === "") {
        alert("ユーザー名を設定してください。");
        return;
    }

    if (roomId === "") {
        alert("部屋 ID を設定してください。");
        return;
    }

    enterRoomButton.disabled = true;

    ws = new WebSocket(uri);

    ws.onopen = (e) => {
        log("Connected");
        send("[入室しました]");
        enterRoom();

        userNameInput.disabled = true;
        roomIdInput.disabled = true;
        exitRoomButton.disabled = false;
        messageInput.disabled = false;
        sendButton.disabled = false;
    };

    ws.onmessage = (e) => {
        write(decodeURI(e.data));
    };

    ws.onclose = (e) => {
        log("Disconnected");

        userNameInput.disabled = false;
        roomIdInput.disabled = false;
        enterRoomButton.disabled = false;
        messageInput.disabled = true;
        sendButton.disabled = true;
    };

    ws.onerror = function(e) {
        log("Error: " + e.data);
    };
}

exitRoomButton.onclick = async () => {
    exitRoomButton.disabled = true;
    send("[退出しました]");
    await sleep(1000);
    exitRoom();
    ws.close();
}

sendButton.onclick = () => {
    const msg = messageInput.value;

    if (msg === "") {
        alert("メッセージを入れてください。");
        return;
    }

    send(msg);

    messageInput.value = "";
}
    </script>
  </body>
</html>

ユーザー名と部屋 ID を設定して [入室] ボタンを押すと、接続がうまくいけば "[入室しました]" と出る。メッセージを入力して [送信] ボタンを押すと、同じ部屋 ID の接続先 (自分自身も含む) にメッセージが送られる。[退室] ボタンを押すと "[退室しました]" と出て接続が切られる。そのまま処理を書くと "[退室しました]" と返ってくる前に接続が切れてしまうので、1 秒間だけ待ってから接続を切るようにしている。

いくつかのウインドウで開いてそれぞれ接続すると、DynamoDB テーブルに接続情報が登録されていることが確認できる。きちんと退室するとそれらはちゃんと消える。実用を考えると、接続情報が残ってしまったときの処理も必要と考えられる。

Lambda 関数からの接続

Lambda 関数から WebSocket で接続してメッセージを送ってみる。

index.js

const WebSocket = require('websocket').w3cwebsocket;

const uri = "wss://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev";
const roomId = "room1";

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

exports.handler = (event) => {
    const ws = new WebSocket(uri);

    ws.onopen = async (e) => {
        ws.send(`{"action": "enterroom", "data": "${roomId}"}`);
        await sleep(500);
        ws.send(`{"action": "sendmessage", "data": "message from Lambda function"}`);
        await sleep(500);
        ws.send(`{"action": "exitroom", "data": "${roomId}"}`);
        await sleep(500);
        ws.close();
    };

    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };
    return response;
};

次々にメッセージを送るとうまくいかないので、適当に待ちを入れている。

ローカルで websocket モジュールを用意する。

>npm init
>npm install --save websocket

フォルダの中身を zip 圧縮してアップロード。

参考