--
先日、以下の記事を書きました。
WebSocketとWAMPとRatchetに関するメモ
http://madroom-project.blogspot.jp/2013/05/websocketwampratchet.html
実際にFuelPHPのRatchetパッケージで、WAMPのPubSubとRPCを試してみました。
https://github.com/mp-php/fuel-packages-ratchet
具体的には、以下の機能を作ってみました。
* 指定したトピックに配信する (PubSub)
* 指定したトピックを購読する (PubSub)
* 指定したトピックを購読解除する (PubSub)
* 購読中のトピック一覧を取得する (RPC)
以下、htmlと、RatchetパッケージのRatchet_Wampクラスを継承したクラスのソースです。
-- public/wamp_test.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>FuelPHP x Ratchet WAMP Test</title>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script>
<script type="text/javascript" src="assets/js/autobahn.min.js"></script>
</head>
<body>
<!--配信 (WampServerのonPublishメソッドが呼ばれる)-->
<div id="publish">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<input type="text" />
<button>Publish</button>
</div>
<hr />
<!--購読 (WampServerのonSubscribeメソッドが呼ばれる)-->
<div id="subscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Subscribe</button>
</div>
<hr />
<!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)-->
<div id="unsubscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Unsubscribe</button>
</div>
<hr />
<!--RPC (WampServerのonCallメソッドが呼ばれる)-->
<div id="rpc">
<select>
    <option value="get_subscribing_topics">Get Subscribing Topics</option>
    <option value="invalid_method">Invalid Method</option>
</select>
<button>Call</button>
</div>
<hr />
<p>Check your console.</p>
<script>
$(document).ready(function() {
    var sess; // WampServerとのコネクション
    var wsuri = 'ws://example.com:[ポート番号]';
    /**
     * 使い方等: http://autobahn.ws/js
     */
    sess = new ab.Session(wsuri,
        // コネクション接続時のコールバック関数
        function() {
            console.log("Connected!");
        },
        // コネクション切断時のコールバック関数
        function(reason) {
            switch (reason) {
                case ab.CONNECTION_CLOSED:
                    // 意図した切断の場合?
                    console.log("Connection was closed properly - done.");
                break;
                case ab.CONNECTION_UNREACHABLE:
                    // WampServerに到達できなかった場合?
                    console.log("Connection could not be established.");
                break;
                case ab.CONNECTION_UNSUPPORTED:
                    // ブラウザがWebSocketをサポートしていない場合
                    console.log("Browser does not support WebSocket.");
                break;
                case ab.CONNECTION_LOST:
                    // 意図しない切断の場合?
                    console.log("Connection lost - reconnecting ...");
                    // 1秒後に再接続を試みる
                    window.setTimeout(connect, 1000);
                break;
            }
        }
    );
    // 配信 (WampServerのonPublishメソッドが呼ばれる)
    $("#publish > button").click(function() {
        var input = $("#publish > input");
        var select = $("#publish > select");
        console.log("-- Publish --");
        if(input.val().length == 0) {
            console.log("Input is empty.");
        } else {
            console.log("Topic: " + select.val());
            console.log("Input: " + input.val());
            sess.publish(select.val(), JSON.stringify({input: input.val()}));
            input.val('');
        }
    });
    // 購読 (WampServerのonSubscribeメソッドが呼ばれる)
    $("#subscribe > button").click(function() {
        var select = $("#subscribe > select");
        console.log("-- Subscribe --");
        console.log("Topic: " + select.val());
        sess.subscribe(select.val(), function (topic, event) {
            console.log("-- Received --");
            console.log("Topic: " + topic);
            console.log("event: " + event);
        });
    });
    // 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)
    $("#unsubscribe > button").click(function() {
        var select = $("#unsubscribe > select");
        console.log("-- Unsubscribe --");
        console.log("Topic: " + select.val());
        try {
            sess.unsubscribe(select.val());
        } catch(e) {
            console.warn(e);
        }
    });
    // RPC (WampServerのonCallメソッドが呼ばれる)
    $("#rpc > button").click(function() {
        var select = $("#rpc > select");
        console.log("-- RPC --");
        console.log("Method: " + select.val());
        sess.call(select.val()).then(function (result) {
           // do stuff with the result
           console.log(result);
        }, function(error) {
           // handle the error
           console.log(error);
        });
    });
});
</script>
</body>
</html>
-- fuel/app/classes/ratchet/wamp/test.php --* バリデーションとか、細かな処理は抜けています。
<?php
class Ratchet_Wamp_Test extends Ratchet_Wamp
{
    // トピック一覧
    private $topics = array();
    /**
     * 切断
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     */
    public function onClose(\Ratchet\ConnectionInterface $conn) {
        // 全てのトピックを購読解除
        foreach ($this->topics as $topic)
        {
            $this->onUnSubscribe($conn, $topic);
        }
    }
    /**
     * 配信
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     * @param  string $event
     * @param  array $exclude
     * @param  array $eligible
     */
    public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('$event : '.$event);
        Log::debug('$exclude : '.print_r($exclude, true));
        Log::debug('$eligible : '.print_r($eligible, true));
        Log::debug('********** '.__FUNCTION__.' end **********');
        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }
        $json = json_decode($event);
        // トピックに対する購読者が存在する場合、配信
        if (array_key_exists($topic->getId(), $this->topics))
        {
            $topic->broadcast(Security::htmlentities($json->input));
        }
    }
    /**
     * 購読
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');
        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }
        // トピック一覧にトピックを追加
        if (!array_key_exists($topic->getId(), $this->topics))
        {
            $this->topics[$topic->getId()] = $topic;
        }
    }
    /**
     * 購読解除
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');
        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }
        // トピックからコネクションを削除
        $topic->remove($conn);
        // トピックの購読者が存在しない場合、トピック一覧からトピックを削除
        if ($topic->count() == 0)
        {
            unset($this->topics[$topic->getId()]);
        }
    }
    /**
     * RPC
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string $id
     * @param  string|\Ratchet\Wamp\Topic $fn
     * @param  array $params
     * @return \Ratchet\Wamp\WampConnection
     */
    public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$id : '.$id);
        Log::debug('$fn : '.$fn);
        Log::debug('$params : '.print_r($params, true));
        Log::debug('********** '.__FUNCTION__.' end **********');
        switch ($fn) {
            // 購読しているトピック一覧を取得
            case 'get_subscribing_topics':
                $subscribing_topics = array();
                Log::debug('********** Topics begin **********');
                foreach ($this->topics as $topic)
                {
                    Log::debug('$topic : '.$topic);
                    Log::debug('$topic->count() : '.$topic->count());
                    $topic->has($conn) and $subscribing_topics[] = $topic;
                }
                Log::debug('********** Topics end **********');
                return $conn->callResult($id, Security::htmlentities($subscribing_topics));
            break;
            // エラー処理
            default:
                $errorUri = 'errorUri';
                $desc = 'desc';
                $details = 'details';
                /**
                 * \Ratchet\Wamp\WampConnection
                 * 
                 * callError($id, $errorUri, $desc = '', $details = null)
                 */
                return $conn->callError($id, $errorUri, $desc, $details);
            break;
        }
    }
}
/* end of file test.php */
wamp_test.htmlにアクセスすると、以下のような画面になります。上から順に
* 選択したTopicでメッセージを配信
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。
http://madroom-project.blogspot.jp/2013/04/fuelphp-x-ratchet.html
もし追加したら当記事にも追記します。
次は、ZeroMQでHTTPサーバやタスクからのPublishか。

No comments:
Post a Comment