FuelPHP x RatchetでWAMPのPubSubとRPCを試してみた
http://madroom-project.blogspot.jp/2013/05/fuelphp-x-ratchetwamppubsubrpc.html
前回の記事もそうですが、これまでのサンプル等は、全て配信者がクライアント(ブラウザ)です。
対して、今回はFuelPHPのタスクから配信してみます。また、未確認ですが、当然、Controller等からも可能なはずです。
前回の記事のサンプルと異なるのは
* htmlとRatchetのWampServerにpublishの機能が無い
* 代わりに、タスクからZeroMQ経由でpublishする
* RatchetのWampServerにzmqCallbackというメソッドを用意する
となります。尚、"zmqCallback"というメソッド名はRatchetパッケージのRatchetタスクのwampメソッドで登録しているだけなので、変更可能です。
ZeroMQのインストール手順については、以下を参考にして下さい。
FuelPHPでWebSocketを扱うパッケージを作りました
http://madroom-project.blogspot.jp/2013/04/fuelphpwebsocket.html
ZeroMQとはなんぞや。については、後日、別途まとめたいなと思っていますm(_ _)m
http://www.zeromq.org/
以下、html、RatchetパッケージのRatchet_Wampクラスを継承したクラス、タスクのソースです。
-- public/wamp_test2.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>FuelPHP x Ratchet WAMP Test</title> <script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script> <script type="text/javascript" src="assets/js/autobahn.min.js"></script> </head> <body> <!--購読 (WampServerのonSubscribeメソッドが呼ばれる)--> <div id="subscribe"> <select> <option value="topic_1">Topic 1</option> <option value="topic_2">Topic 2</option> <option value="topic_3">Topic 3</option> <option value="invalid_topic">Invalid Topic</option> </select> <button>Subscribe</button> </div> <hr /> <!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)--> <div id="unsubscribe"> <select> <option value="topic_1">Topic 1</option> <option value="topic_2">Topic 2</option> <option value="topic_3">Topic 3</option> <option value="invalid_topic">Invalid Topic</option> </select> <button>Unsubscribe</button> </div> <hr /> <!--RPC (WampServerのonCallメソッドが呼ばれる)--> <div id="rpc"> <select> <option value="get_subscribing_topics">Get Subscribing Topics</option> <option value="invalid_method">Invalid Method</option> </select> <button>Call</button> </div> <hr /> <p>Check your console.</p> <script> $(document).ready(function() { var sess; // WampServerとのコネクション var wsuri = 'ws://example.com:[ポート番号]'; /** * 使い方等: http://autobahn.ws/js */ sess = new ab.Session(wsuri, // コネクション接続時のコールバック関数 function() { console.log("Connected!"); }, // コネクション切断時のコールバック関数 function(reason) { switch (reason) { case ab.CONNECTION_CLOSED: // 意図した切断の場合? console.log("Connection was closed properly - done."); break; case ab.CONNECTION_UNREACHABLE: // WampServerに到達できなかった場合? console.log("Connection could not be established."); break; case ab.CONNECTION_UNSUPPORTED: // ブラウザがWebSocketをサポートしていない場合 console.log("Browser does not support WebSocket."); break; case ab.CONNECTION_LOST: // 意図しない切断の場合? console.log("Connection lost - reconnecting ..."); // 1秒後に再接続を試みる window.setTimeout(connect, 1000); break; } } ); // 購読 (WampServerのonSubscribeメソッドが呼ばれる) $("#subscribe > button").click(function() { var select = $("#subscribe > select"); console.log("-- Subscribe --"); console.log("Topic: " + select.val()); sess.subscribe(select.val(), function (topic, event) { console.log("-- Received --"); console.log("Topic: " + topic); console.log("event: " + event); }); }); // 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる) $("#unsubscribe > button").click(function() { var select = $("#unsubscribe > select"); console.log("-- Unsubscribe --"); console.log("Topic: " + select.val()); try { sess.unsubscribe(select.val()); } catch(e) { console.warn(e); } }); // RPC (WampServerのonCallメソッドが呼ばれる) $("#rpc > button").click(function() { var select = $("#rpc > select"); console.log("-- RPC --"); console.log("Method: " + select.val()); sess.call(select.val()).then(function (result) { // do stuff with the result console.log(result); }, function(error) { // handle the error console.log(error); }); }); }); </script> </body> </html>-- fuel/app/classes/ratchet/wamp/test2.php --
* バリデーションとか、細かな処理は抜けています。
<?php class Ratchet_Wamp_Test2 extends Ratchet_Wamp { // トピック一覧 private $topics = array(); /** * 切断 * * @param \Ratchet\ConnectionInterface $conn */ public function onClose(\Ratchet\ConnectionInterface $conn) { // 全てのトピックを購読解除 foreach ($this->topics as $topic) { $this->onUnSubscribe($conn, $topic); } } /** * 購読 * * @param \Ratchet\ConnectionInterface $conn * @param string|\Ratchet\Wamp\Topic $topic */ public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$topic : '.$topic); Log::debug('********** '.__FUNCTION__.' end **********'); // 不正なトピック if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) { return; } // トピック一覧にトピックを追加 if (!array_key_exists($topic->getId(), $this->topics)) { $this->topics[$topic->getId()] = $topic; } } /** * 購読解除 * * @param \Ratchet\ConnectionInterface $conn * @param string|\Ratchet\Wamp\Topic $topic */ public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$topic : '.$topic); Log::debug('********** '.__FUNCTION__.' end **********'); // 不正なトピック if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) { return; } // トピックからコネクションを削除 $topic->remove($conn); // トピックの購読者が存在しない場合、トピック一覧からトピックを削除 if ($topic->count() == 0) { unset($this->topics[$topic->getId()]); } } /** * RPC * * @param \Ratchet\ConnectionInterface $conn * @param string $id * @param string|\Ratchet\Wamp\Topic $fn * @param array $params * @return \Ratchet\Wamp\WampConnection */ public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$id : '.$id); Log::debug('$fn : '.$fn); Log::debug('$params : '.print_r($params, true)); Log::debug('********** '.__FUNCTION__.' end **********'); switch ($fn) { // 購読しているトピック一覧を取得 case 'get_subscribing_topics': $subscribing_topics = array(); Log::debug('********** Topics begin **********'); foreach ($this->topics as $topic) { Log::debug('$topic : '.$topic); Log::debug('$topic->count() : '.$topic->count()); $topic->has($conn) and $subscribing_topics[] = $topic; } Log::debug('********** Topics end **********'); return $conn->callResult($id, Security::htmlentities($subscribing_topics)); break; // エラー処理 default: $errorUri = 'errorUri'; $desc = 'desc'; $details = 'details'; /** * \Ratchet\Wamp\WampConnection * * callError($id, $errorUri, $desc = '', $details = null) */ return $conn->callError($id, $errorUri, $desc, $details); break; } } /** * ZeroMQ経由でコールされる * * @param string $msg */ public function zmqCallback($msg) { Log::debug('********** '.__FUNCTION__.' begin **********'); Log::debug('$json_string : '.$msg); Log::debug('********** '.__FUNCTION__.' end **********'); $json = json_decode($msg); if( ! isset($json->topic) || ! isset($json->msg)) { return; } foreach ($this->topics as $topic) { if ($json->topic == $topic) { // 配信 $topic->broadcast(Security::htmlentities($json->msg)); break; } } } } /* end of file test2.php */-- fuel/app/tasks/zmq.php --
<?php namespace Fuel\Tasks; class Zmq { public static function run() { // TODO: } /** * ZeroMQを用いてRatchetのWampServerにpushする * * Note: * http://php.zero.mq/ * http://socketo.me/docs/push#editblogsubmission */ public static function push($topic = null, $msg = null, $port = '5555') { if ($topic === null or $msg === null) { return; } $context = new \ZMQContext(); $socket = $context->getSocket(\ZMQ::SOCKET_PUSH); $socket->connect("tcp://localhost:{$port}"); $socket->send(json_encode(array( 'topic' => $topic, 'msg' => $msg, ))); } } /* End of file tasks/zmp.php */wamp_test2.htmlにアクセスすると、以下のような画面になります。
上から順に
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。
何れかのTopicを購読(仮にtopic_1を購読したとします。)した後
php oil r zmq:push topic_1 test
とすると、"topic_1"に対して"test"というメッセージをブラウザが受信します。
以下、参考として、前回記事と今回記事の、htmlとphp(WampServer)のdiffです。
$ diff test.html test2.html 13,26d12 < <!--配信 (WampServerのonPublishメソッドが呼ばれる)--> < <div id="publish"> < <select> < <option value="topic_1">Topic 1</option> < <option value="topic_2">Topic 2</option> < <option value="topic_3">Topic 3</option> < <option value="invalid_topic">Invalid Topic</option> < </select> < <input type="text" /> < <button>Publish</button> < </div> < < <hr /> < 107,123d92 < // 配信 (WampServerのonPublishメソッドが呼ばれる) < $("#publish > button").click(function() { < var input = $("#publish > input"); < var select = $("#publish > select"); < < console.log("-- Publish --"); < if(input.val().length == 0) { < console.log("Input is empty."); < } else { < console.log("Topic: " + select.val()); < console.log("Input: " + input.val()); < < sess.publish(select.val(), JSON.stringify({input: input.val()})); < input.val(''); < } < }); < $ diff test.php test2.php 3c3 < class Ratchet_Wamp_Test extends Ratchet_Wamp --- > class Ratchet_Wamp_Test2 extends Ratchet_Wamp 22,53d21 < * 配信 < * < * @param \Ratchet\ConnectionInterface $conn < * @param string|\Ratchet\Wamp\Topic $topic < * @param string $event < * @param array $exclude < * @param array $eligible < */ < public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { < Log::debug('********** '.__FUNCTION__.' begin **********'); < Log::debug('$topic : '.$topic); < Log::debug('$event : '.$event); < Log::debug('$exclude : '.print_r($exclude, true)); < Log::debug('$eligible : '.print_r($eligible, true)); < Log::debug('********** '.__FUNCTION__.' end **********'); < < // 不正なトピック < if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3'))) < { < return; < } < < $json = json_decode($event); < < // トピックに対する購読者が存在する場合、配信 < if (array_key_exists($topic->getId(), $this->topics)) < { < $topic->broadcast(Security::htmlentities($json->input)); < } < } < < /** 156a125,152 > /** > * ZeroMQ経由でコールされる > * > * @param string $msg > */ > public function zmqCallback($msg) { > Log::debug('********** '.__FUNCTION__.' begin **********'); > Log::debug('$json_string : '.$msg); > Log::debug('********** '.__FUNCTION__.' end **********'); > > $json = json_decode($msg); > > if( ! isset($json->topic) || ! isset($json->msg)) > { > return; > } > > foreach ($this->topics as $topic) > { > if ($json->topic == $topic) > { > // 配信 > $topic->broadcast(Security::htmlentities($json->msg)); > break; > } > } > } > 159c155 < /* end of file test.php */ --- > /* end of file test2.php */
No comments:
Post a Comment