May 4, 2013

FuelPHP x RatchetのWAMPにタスクからメッセージを配信してみた

前回の記事で、RatchetのWAMPの使い方を簡単に確認してみました。

FuelPHP x RatchetでWAMPのPubSubとRPCを試してみた
http://madroom-project.blogspot.jp/2013/05/fuelphp-x-ratchetwamppubsubrpc.html

前回の記事もそうですが、これまでのサンプル等は、全て配信者がクライアント(ブラウザ)です。

対して、今回はFuelPHPのタスクから配信してみます。また、未確認ですが、当然、Controller等からも可能なはずです。

前回の記事のサンプルと異なるのは
* htmlとRatchetのWampServerにpublishの機能が無い
* 代わりに、タスクからZeroMQ経由でpublishする
* RatchetのWampServerにzmqCallbackというメソッドを用意する
となります。尚、"zmqCallback"というメソッド名はRatchetパッケージのRatchetタスクのwampメソッドで登録しているだけなので、変更可能です。

ZeroMQのインストール手順については、以下を参考にして下さい。

FuelPHPでWebSocketを扱うパッケージを作りました
http://madroom-project.blogspot.jp/2013/04/fuelphpwebsocket.html

ZeroMQとはなんぞや。については、後日、別途まとめたいなと思っていますm(_ _)m
http://www.zeromq.org/

以下、html、RatchetパッケージのRatchet_Wampクラスを継承したクラス、タスクのソースです。
-- public/wamp_test2.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>FuelPHP x Ratchet WAMP Test</title>
 
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script>
<script type="text/javascript" src="assets/js/autobahn.min.js"></script>
 
</head>
<body>
 
<!--購読 (WampServerのonSubscribeメソッドが呼ばれる)-->
<div id="subscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Subscribe</button>
</div>
 
<hr />
 
<!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)-->
<div id="unsubscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Unsubscribe</button>
</div>
 
<hr />
 
<!--RPC (WampServerのonCallメソッドが呼ばれる)-->
<div id="rpc">
<select>
    <option value="get_subscribing_topics">Get Subscribing Topics</option>
    <option value="invalid_method">Invalid Method</option>
</select>
<button>Call</button>
</div>
 
<hr />
 
<p>Check your console.</p>
 
<script>
$(document).ready(function() {
    var sess; // WampServerとのコネクション
    var wsuri = 'ws://example.com:[ポート番号]';
 
    /**
     * 使い方等: http://autobahn.ws/js
     */
    sess = new ab.Session(wsuri,
 
        // コネクション接続時のコールバック関数
        function() {
            console.log("Connected!");
        },
 
        // コネクション切断時のコールバック関数
        function(reason) {
            switch (reason) {
                case ab.CONNECTION_CLOSED:
                    // 意図した切断の場合?
                    console.log("Connection was closed properly - done.");
                break;
                case ab.CONNECTION_UNREACHABLE:
                    // WampServerに到達できなかった場合?
                    console.log("Connection could not be established.");
                break;
                case ab.CONNECTION_UNSUPPORTED:
                    // ブラウザがWebSocketをサポートしていない場合
                    console.log("Browser does not support WebSocket.");
                break;
                case ab.CONNECTION_LOST:
                    // 意図しない切断の場合?
                    console.log("Connection lost - reconnecting ...");
 
                    // 1秒後に再接続を試みる
                    window.setTimeout(connect, 1000);
                break;
            }
        }
    );
 
    // 購読 (WampServerのonSubscribeメソッドが呼ばれる)
    $("#subscribe > button").click(function() {
        var select = $("#subscribe > select");
 
        console.log("-- Subscribe --");
        console.log("Topic: " + select.val());
 
        sess.subscribe(select.val(), function (topic, event) {
            console.log("-- Received --");
            console.log("Topic: " + topic);
            console.log("event: " + event);
        });
    });
 
    // 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)
    $("#unsubscribe > button").click(function() {
        var select = $("#unsubscribe > select");
 
        console.log("-- Unsubscribe --");
        console.log("Topic: " + select.val());
 
        try {
            sess.unsubscribe(select.val());
        } catch(e) {
            console.warn(e);
        }
    });
 
    // RPC (WampServerのonCallメソッドが呼ばれる)
    $("#rpc > button").click(function() {
        var select = $("#rpc > select");
 
        console.log("-- RPC --");
        console.log("Method: " + select.val());
 
        sess.call(select.val()).then(function (result) {
           // do stuff with the result
           console.log(result);
        }, function(error) {
           // handle the error
           console.log(error);
        });
    });
 
});
</script>
 
</body>
</html>
-- fuel/app/classes/ratchet/wamp/test2.php --
* バリデーションとか、細かな処理は抜けています。
<?php
 
class Ratchet_Wamp_Test2 extends Ratchet_Wamp
{
    // トピック一覧
    private $topics = array();
 
    /**
     * 切断
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     */
    public function onClose(\Ratchet\ConnectionInterface $conn) {
        // 全てのトピックを購読解除
        foreach ($this->topics as $topic)
        {
            $this->onUnSubscribe($conn, $topic);
        }
    }
 
    /**
     * 購読
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');
 
        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }
 
        // トピック一覧にトピックを追加
        if (!array_key_exists($topic->getId(), $this->topics))
        {
            $this->topics[$topic->getId()] = $topic;
        }
    }
 
    /**
     * 購読解除
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');
 
        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }
 
        // トピックからコネクションを削除
        $topic->remove($conn);
 
        // トピックの購読者が存在しない場合、トピック一覧からトピックを削除
        if ($topic->count() == 0)
        {
            unset($this->topics[$topic->getId()]);
        }
 
    }
 
    /**
     * RPC
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string $id
     * @param  string|\Ratchet\Wamp\Topic $fn
     * @param  array $params
     * @return \Ratchet\Wamp\WampConnection
     */
    public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$id : '.$id);
        Log::debug('$fn : '.$fn);
        Log::debug('$params : '.print_r($params, true));
        Log::debug('********** '.__FUNCTION__.' end **********');
 
        switch ($fn) {
            // 購読しているトピック一覧を取得
            case 'get_subscribing_topics':
                $subscribing_topics = array();
 
                Log::debug('********** Topics begin **********');
 
                foreach ($this->topics as $topic)
                {
                    Log::debug('$topic : '.$topic);
                    Log::debug('$topic->count() : '.$topic->count());
 
                    $topic->has($conn) and $subscribing_topics[] = $topic;
                }
 
                Log::debug('********** Topics end **********');
 
                return $conn->callResult($id, Security::htmlentities($subscribing_topics));
            break;
 
            // エラー処理
            default:
                $errorUri = 'errorUri';
                $desc = 'desc';
                $details = 'details';
 
                /**
                 * \Ratchet\Wamp\WampConnection
                 * 
                 * callError($id, $errorUri, $desc = '', $details = null)
                 */
                return $conn->callError($id, $errorUri, $desc, $details);
            break;
        }
    }

    /**
     * ZeroMQ経由でコールされる
     * 
     * @param  string $msg
     */
    public function zmqCallback($msg) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$json_string : '.$msg);
        Log::debug('********** '.__FUNCTION__.' end **********');

        $json = json_decode($msg);

        if( ! isset($json->topic) || ! isset($json->msg))
        {
            return;
        }

        foreach ($this->topics as $topic)
        {
            if ($json->topic == $topic)
            {
                // 配信
                $topic->broadcast(Security::htmlentities($json->msg));
                break;
            }
        }
    }

}
 
/* end of file test2.php */
-- fuel/app/tasks/zmq.php --
<?php

namespace Fuel\Tasks;

class Zmq
{
    public static function run()
    {
        // TODO:
    }

    /**
     * ZeroMQを用いてRatchetのWampServerにpushする
     * 
     * Note:
     * http://php.zero.mq/
     * http://socketo.me/docs/push#editblogsubmission
     */
    public static function push($topic = null, $msg = null, $port = '5555')
    {
        if ($topic === null or $msg === null)
        {
            return;
        }

        $context = new \ZMQContext();
        $socket = $context->getSocket(\ZMQ::SOCKET_PUSH);
        $socket->connect("tcp://localhost:{$port}");

        $socket->send(json_encode(array(
            'topic' => $topic,
            'msg' => $msg,
        )));
    }

}

/* End of file tasks/zmp.php */
wamp_test2.htmlにアクセスすると、以下のような画面になります。
上から順に
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。

何れかのTopicを購読(仮にtopic_1を購読したとします。)した後
php oil r zmq:push topic_1 test
とすると、"topic_1"に対して"test"というメッセージをブラウザが受信します。

以下、参考として、前回記事と今回記事の、htmlとphp(WampServer)のdiffです。
$ diff test.html test2.html
13,26d12
< <!--配信 (WampServerのonPublishメソッドが呼ばれる)-->
< <div id="publish">
< <select>
<     <option value="topic_1">Topic 1</option>
<     <option value="topic_2">Topic 2</option>
<     <option value="topic_3">Topic 3</option>
<     <option value="invalid_topic">Invalid Topic</option>
< </select>
< <input type="text" />
< <button>Publish</button>
< </div>
<
< <hr />
<
107,123d92
<     // 配信 (WampServerのonPublishメソッドが呼ばれる)
<     $("#publish > button").click(function() {
<         var input = $("#publish > input");
<         var select = $("#publish > select");
<
<         console.log("-- Publish --");
<         if(input.val().length == 0) {
<             console.log("Input is empty.");
<         } else {
<             console.log("Topic: " + select.val());
<             console.log("Input: " + input.val());
<
<             sess.publish(select.val(), JSON.stringify({input: input.val()}));
<             input.val('');
<         }
<     });
<


$ diff test.php test2.php
3c3
< class Ratchet_Wamp_Test extends Ratchet_Wamp
---
> class Ratchet_Wamp_Test2 extends Ratchet_Wamp
22,53d21
<      * 配信
<      *
<      * @param  \Ratchet\ConnectionInterface $conn
<      * @param  string|\Ratchet\Wamp\Topic $topic
<      * @param  string $event
<      * @param  array $exclude
<      * @param  array $eligible
<      */
<     public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
<         Log::debug('********** '.__FUNCTION__.' begin **********');
<         Log::debug('$topic : '.$topic);
<         Log::debug('$event : '.$event);
<         Log::debug('$exclude : '.print_r($exclude, true));
<         Log::debug('$eligible : '.print_r($eligible, true));
<         Log::debug('********** '.__FUNCTION__.' end **********');
<
<         // 不正なトピック
<         if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
<         {
<             return;
<         }
<
<         $json = json_decode($event);
<
<         // トピックに対する購読者が存在する場合、配信
<         if (array_key_exists($topic->getId(), $this->topics))
<         {
<             $topic->broadcast(Security::htmlentities($json->input));
<         }
<     }
<
<     /**
156a125,152
>     /**
>      * ZeroMQ経由でコールされる
>      *
>      * @param  string $msg
>      */
>     public function zmqCallback($msg) {
>         Log::debug('********** '.__FUNCTION__.' begin **********');
>         Log::debug('$json_string : '.$msg);
>         Log::debug('********** '.__FUNCTION__.' end **********');
>
>         $json = json_decode($msg);
>
>         if( ! isset($json->topic) || ! isset($json->msg))
>         {
>             return;
>         }
>
>         foreach ($this->topics as $topic)
>         {
>             if ($json->topic == $topic)
>             {
>                 // 配信
>                 $topic->broadcast(Security::htmlentities($json->msg));
>                 break;
>             }
>         }
>     }
>
159c155
< /* end of file test.php */
---
> /* end of file test2.php */

No comments:

Post a Comment