--
先日、以下の記事を書きました。
WebSocketとWAMPとRatchetに関するメモ
http://madroom-project.blogspot.jp/2013/05/websocketwampratchet.html
実際にFuelPHPのRatchetパッケージで、WAMPのPubSubとRPCを試してみました。
https://github.com/mp-php/fuel-packages-ratchet
具体的には、以下の機能を作ってみました。
* 指定したトピックに配信する (PubSub)
* 指定したトピックを購読する (PubSub)
* 指定したトピックを購読解除する (PubSub)
* 購読中のトピック一覧を取得する (RPC)
以下、htmlと、RatchetパッケージのRatchet_Wampクラスを継承したクラスのソースです。
-- public/wamp_test.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>FuelPHP x Ratchet WAMP Test</title> <script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script> <script type="text/javascript" src="assets/js/autobahn.min.js"></script> </head> <body> <!--配信 (WampServerのonPublishメソッドが呼ばれる)--> <div id="publish"> <select> <option value="topic_1">Topic 1</option> <option value="topic_2">Topic 2</option> <option value="topic_3">Topic 3</option> <option value="invalid_topic">Invalid Topic</option> </select> <input type="text" /> <button>Publish</button> </div> <hr /> <!--購読 (WampServerのonSubscribeメソッドが呼ばれる)--> <div id="subscribe"> <select> <option value="topic_1">Topic 1</option> <option value="topic_2">Topic 2</option> <option value="topic_3">Topic 3</option> <option value="invalid_topic">Invalid Topic</option> </select> <button>Subscribe</button> </div> <hr /> <!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)--> <div id="unsubscribe"> <select> <option value="topic_1">Topic 1</option> <option value="topic_2">Topic 2</option> <option value="topic_3">Topic 3</option> <option value="invalid_topic">Invalid Topic</option> </select> <button>Unsubscribe</button> </div> <hr /> <!--RPC (WampServerのonCallメソッドが呼ばれる)--> <div id="rpc"> <select> <option value="get_subscribing_topics">Get Subscribing Topics</option> <option value="invalid_method">Invalid Method</option> </select> <button>Call</button> </div> <hr /> <p>Check your console.</p> <script> $(document).ready(function() { var sess; // WampServerとのコネクション var wsuri = 'ws://example.com:[ポート番号]'; /** * 使い方等: http://autobahn.ws/js */ sess = new ab.Session(wsuri, // コネクション接続時のコールバック関数 function() { console.log("Connected!"); }, // コネクション切断時のコールバック関数 function(reason) { switch (reason) { case ab.CONNECTION_CLOSED: // 意図した切断の場合? console.log("Connection was closed properly - done."); break; case ab.CONNECTION_UNREACHABLE: // WampServerに到達できなかった場合? console.log("Connection could not be established."); break; case ab.CONNECTION_UNSUPPORTED: // ブラウザがWebSocketをサポートしていない場合 console.log("Browser does not support WebSocket."); break; case ab.CONNECTION_LOST: // 意図しない切断の場合? console.log("Connection lost - reconnecting ..."); // 1秒後に再接続を試みる window.setTimeout(connect, 1000); break; } } ); // 配信 (WampServerのonPublishメソッドが呼ばれる) $("#publish > button").click(function() { var input = $("#publish > input"); var select = $("#publish > select"); console.log("-- Publish --"); if(input.val().length == 0) { console.log("Input is empty."); } else { console.log("Topic: " + select.val()); console.log("Input: " + input.val()); sess.publish(select.val(), JSON.stringify({input: input.val()})); input.val(''); } }); // 購読 (WampServerのonSubscribeメソッドが呼ばれる) $("#subscribe > button").click(function() { var select = $("#subscribe > select"); console.log("-- Subscribe --"); console.log("Topic: " + select.val()); sess.subscribe(select.val(), function (topic, event) { console.log("-- Received --"); console.log("Topic: " + topic); console.log("event: " + event); }); }); // 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる) $("#unsubscribe > button").click(function() { var select = $("#unsubscribe > select"); console.log("-- Unsubscribe --"); console.log("Topic: " + select.val()); try { sess.unsubscribe(select.val()); } catch(e) { console.warn(e); } }); // RPC (WampServerのonCallメソッドが呼ばれる) $("#rpc > button").click(function() { var select = $("#rpc > select"); console.log("-- RPC --"); console.log("Method: " + select.val()); sess.call(select.val()).then(function (result) { // do stuff with the result console.log(result); }, function(error) { // handle the error console.log(error); }); }); }); </script> </body> </html>-- fuel/app/classes/ratchet/wamp/test.php --
* バリデーションとか、細かな処理は抜けています。
<?php class Ratchet_Wamp_Test extends Ratchet_Wamp { // トピック一覧 private $topics = array(); /** * 切断 * * @param \Ratchet\ConnectionInterface $conn */ public function onClose(\Ratchet\ConnectionInterface $conn) { // 全てのトピックを購読解除 foreach ($this->topics as $topic) { $this->onUnSubscribe($conn, $topic); } } /** * 配信 * * @param \Ratchet\ConnectionInterface $conn * @param string|\Ratchet\Wamp\Topic $topic * @param string $event * @param array $exclude * @param array $eligible */ public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$topic : '.$topic); Log::debug('$event : '.$event); Log::debug('$exclude : '.print_r($exclude, true)); Log::debug('$eligible : '.print_r($eligible, true)); Log::debug('********** '.__FUNCTION__.' end **********'); // 不正なトピック if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) { return; } $json = json_decode($event); // トピックに対する購読者が存在する場合、配信 if (array_key_exists($topic->getId(), $this->topics)) { $topic->broadcast(Security::htmlentities($json->input)); } } /** * 購読 * * @param \Ratchet\ConnectionInterface $conn * @param string|\Ratchet\Wamp\Topic $topic */ public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$topic : '.$topic); Log::debug('********** '.__FUNCTION__.' end **********'); // 不正なトピック if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) { return; } // トピック一覧にトピックを追加 if (!array_key_exists($topic->getId(), $this->topics)) { $this->topics[$topic->getId()] = $topic; } } /** * 購読解除 * * @param \Ratchet\ConnectionInterface $conn * @param string|\Ratchet\Wamp\Topic $topic */ public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$topic : '.$topic); Log::debug('********** '.__FUNCTION__.' end **********'); // 不正なトピック if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) { return; } // トピックからコネクションを削除 $topic->remove($conn); // トピックの購読者が存在しない場合、トピック一覧からトピックを削除 if ($topic->count() == 0) { unset($this->topics[$topic->getId()]); } } /** * RPC * * @param \Ratchet\ConnectionInterface $conn * @param string $id * @param string|\Ratchet\Wamp\Topic $fn * @param array $params * @return \Ratchet\Wamp\WampConnection */ public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$id : '.$id); Log::debug('$fn : '.$fn); Log::debug('$params : '.print_r($params, true)); Log::debug('********** '.__FUNCTION__.' end **********'); switch ($fn) { // 購読しているトピック一覧を取得 case 'get_subscribing_topics': $subscribing_topics = array(); Log::debug('********** Topics begin **********'); foreach ($this->topics as $topic) { Log::debug('$topic : '.$topic); Log::debug('$topic->count() : '.$topic->count()); $topic->has($conn) and $subscribing_topics[] = $topic; } Log::debug('********** Topics end **********'); return $conn->callResult($id, Security::htmlentities($subscribing_topics)); break; // エラー処理 default: $errorUri = 'errorUri'; $desc = 'desc'; $details = 'details'; /** * \Ratchet\Wamp\WampConnection * * callError($id, $errorUri, $desc = '', $details = null) */ return $conn->callError($id, $errorUri, $desc, $details); break; } } } /* end of file test.php */wamp_test.htmlにアクセスすると、以下のような画面になります。
上から順に
* 選択したTopicでメッセージを配信
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。
http://madroom-project.blogspot.jp/2013/04/fuelphp-x-ratchet.html
もし追加したら当記事にも追記します。
次は、ZeroMQでHTTPサーバやタスクからのPublishか。
No comments:
Post a Comment