読者です 読者をやめる 読者になる 読者になる

【Aws】SQSを使ってみた

はじめに

やりたかったことはアクセスが多いサービスでDBへのデータの登録など時間がかかりそうな処理を非同期で行うということ。
ちなみにデータの更新結果によって画面の表示がかわるような場合では使用できません。
今回は主にログデータの登録なのでこの形で実装することにしました

ざっくりsqsを使った処理の流れ

1. webサーバーからsqsキューへメッセージ送信
2. バッチ処理などで定期的にsqsのメッセージを処理する
2-1. sqsからメッセージを取得
2-2. 取得したメッセージからDBへの登録処理などを行う
2-3. 処理が正常に行えたらsqsのメッセージを削除

という流れになるかと思います。
2-2で正常に処理できない場合はsqsのメッセージの削除をしなければ次回再取得できるため、処理のとりこぼしが防げます

注意すべきこと

同じメッセージが複数回取得できてしまうことがあるので、その場合を想定した作りとする。
キューからの取得回数も取得できるので、当初はこれを使用しようとしたのだがawsコンソールからキューを見ると取得回数もカウントアップしてしまうため今回はこれを使うことを断念しました。
ちなみにApproximateReceiveCountという属性です
なので処理が行えたメッセージはアプリ側でも個別で管理するのがよいのではないでしょうか

実装

awsコンソールからsqsキューは作成済みの前提で。
phpで実装します

sqsキューへメッセージ送信
<?php
require_once(VENDORPATH.'aws/aws-autoloader.php');
use Aws\Sqs\SqsClient;
use Aws\Sqs\Exception\SqsException;

try {
    $client = new SqsClient(array(
        "region"   => "ap-northeast-1",
        "version"  => "latest",
    ));
    $client->sendMessage(array(
        "QueueUrl" => "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/example", // xxxxxxxx=キューのID example=キュー名
        "MessageBody" => json_encode(array(
            // キューへ送信するデータをJSON形式でセット
            "id" => "xxxxxxxx",
            "datetime" => date("Y-m-d H:i:s"),
        )),
    ));
} catch(SqsException $e) {
    // エラー処理
}
sqsからメッセージを取得して処理を行う(バッチ)
<?php
require_once(VENDORPATH.'aws/aws-autoloader.php');
use Aws\Sqs\SqsClient;
use Aws\Sqs\Exception\SqsException;

function getSqsMessage()
{
    $sqs = new SqsClient(array(
        "region"   => "ap-northeast-1",
        "version"  => "latest",
    ));
    return $sqs->receiveMessage(array(
        "QueueUrl" => "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/example", // xxxxxxxx=キューのID example=キュー名
        'MaxNumberOfMessages' => 10, // キューから取得する件数(10件がMax)
        'AttributeNames' => array('All'), // 属性も取得しておく
    ));
}

function deleteSqsMessage($entries)
{
    $sqs = new SqsClient(array(
        "region"   => "ap-northeast-1",
        "version"  => "latest",
    ));
    return $sqs->deleteMessageBatch(array(
        "QueueUrl" => "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/example", // xxxxxxxx=キューのID example=キュー名
        "Entries" => $entries,
    ));
}

// 100件ずつ処理する
// sqsから10件ずつしか取得できないのでループして100件取得するようにする
$messageList = array();
for ($i = 0; $i < 10; $i++) {
    $start_time = microtime(true);

    // キューから10件データ取得                                                                                                                                                                  
    $result = getSqsMessage();

    // 無ければ処理終了                                                                                                                                                                          
    if (!isset($result['Messages'])) {
        break;
    }
    $messageList = array_merge($messageList, $result["Messages"]);
}

// 処理が正常に行えたメッセージのSQS情報を保持
$sqsKeys = array();

// SQSから取得したメッセージでやりたい処理を行う
foreach ($messageList as $k => $message) {
    $json = json_decode($message['Body'], true);

    // DBなどから$message["MessageId"]が処理済みでないかを確認する
    if (処理済みでない場合) {
        // 何らかの処理をおこなう
    }

    if (正常に処理終了 || 既に処理済みの場合) {
        $sqsKeys[] = array(
            "Id" => $message["MessageId"],
            "ReceiptHandle" => $message['ReceiptHandle'],
        );
        // $message["MessageId"]をDBなどへ保存
    }
}

// 正常に処理が行えたメッセージを削除
// ※こちらもMaxで10件づつしか処理できないのでループで繰り返す
foreach (array_chunk($sqsKeys, 10) as $keys) {
    deleteSqsMessage($keys);
}

おわりに

とりあえずこんな感じで実装できました。
削除済みでないキューで同じものが複数回送信されることがあるというのがちょっと面倒ですが、、、
そこはアプリごとに実装します

以上です