前回の記事で、RatchetのWAMPの使い方を簡単に確認してみました。
FuelPHP x RatchetでWAMPのPubSubとRPCを試してみた
http://madroom-project.blogspot.jp/2013/05/fuelphp-x-ratchetwamppubsubrpc.html
前回の記事もそうですが、これまでのサンプル等は、全て配信者がクライアント(ブラウザ)です。
対して、今回はFuelPHPのタスクから配信してみます。また、未確認ですが、当然、Controller等からも可能なはずです。
前回の記事のサンプルと異なるのは
* htmlとRatchetのWampServerにpublishの機能が無い
* 代わりに、タスクからZeroMQ経由でpublishする
* RatchetのWampServerにzmqCallbackというメソッドを用意する
となります。尚、"zmqCallback"というメソッド名はRatchetパッケージのRatchetタスクのwampメソッドで登録しているだけなので、変更可能です。
ZeroMQのインストール手順については、以下を参考にして下さい。
FuelPHPでWebSocketを扱うパッケージを作りました
http://madroom-project.blogspot.jp/2013/04/fuelphpwebsocket.html
ZeroMQとはなんぞや。については、後日、別途まとめたいなと思っていますm(_ _)m
http://www.zeromq.org/
以下、html、RatchetパッケージのRatchet_Wampクラスを継承したクラス、タスクのソースです。
-- public/wamp_test2.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>FuelPHP x Ratchet WAMP Test</title>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script>
<script type="text/javascript" src="assets/js/autobahn.min.js"></script>
</head>
<body>
<!--購読 (WampServerのonSubscribeメソッドが呼ばれる)-->
<div id="subscribe">
<select>
<option value="topic_1">Topic 1</option>
<option value="topic_2">Topic 2</option>
<option value="topic_3">Topic 3</option>
<option value="invalid_topic">Invalid Topic</option>
</select>
<button>Subscribe</button>
</div>
<hr />
<!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)-->
<div id="unsubscribe">
<select>
<option value="topic_1">Topic 1</option>
<option value="topic_2">Topic 2</option>
<option value="topic_3">Topic 3</option>
<option value="invalid_topic">Invalid Topic</option>
</select>
<button>Unsubscribe</button>
</div>
<hr />
<!--RPC (WampServerのonCallメソッドが呼ばれる)-->
<div id="rpc">
<select>
<option value="get_subscribing_topics">Get Subscribing Topics</option>
<option value="invalid_method">Invalid Method</option>
</select>
<button>Call</button>
</div>
<hr />
<p>Check your console.</p>
<script>
$(document).ready(function() {
var sess; // WampServerとのコネクション
var wsuri = 'ws://example.com:[ポート番号]';
/**
* 使い方等: http://autobahn.ws/js
*/
sess = new ab.Session(wsuri,
// コネクション接続時のコールバック関数
function() {
console.log("Connected!");
},
// コネクション切断時のコールバック関数
function(reason) {
switch (reason) {
case ab.CONNECTION_CLOSED:
// 意図した切断の場合?
console.log("Connection was closed properly - done.");
break;
case ab.CONNECTION_UNREACHABLE:
// WampServerに到達できなかった場合?
console.log("Connection could not be established.");
break;
case ab.CONNECTION_UNSUPPORTED:
// ブラウザがWebSocketをサポートしていない場合
console.log("Browser does not support WebSocket.");
break;
case ab.CONNECTION_LOST:
// 意図しない切断の場合?
console.log("Connection lost - reconnecting ...");
// 1秒後に再接続を試みる
window.setTimeout(connect, 1000);
break;
}
}
);
// 購読 (WampServerのonSubscribeメソッドが呼ばれる)
$("#subscribe > button").click(function() {
var select = $("#subscribe > select");
console.log("-- Subscribe --");
console.log("Topic: " + select.val());
sess.subscribe(select.val(), function (topic, event) {
console.log("-- Received --");
console.log("Topic: " + topic);
console.log("event: " + event);
});
});
// 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)
$("#unsubscribe > button").click(function() {
var select = $("#unsubscribe > select");
console.log("-- Unsubscribe --");
console.log("Topic: " + select.val());
try {
sess.unsubscribe(select.val());
} catch(e) {
console.warn(e);
}
});
// RPC (WampServerのonCallメソッドが呼ばれる)
$("#rpc > button").click(function() {
var select = $("#rpc > select");
console.log("-- RPC --");
console.log("Method: " + select.val());
sess.call(select.val()).then(function (result) {
// do stuff with the result
console.log(result);
}, function(error) {
// handle the error
console.log(error);
});
});
});
</script>
</body>
</html>
-- fuel/app/classes/ratchet/wamp/test2.php --
* バリデーションとか、細かな処理は抜けています。
<?php
class Ratchet_Wamp_Test2 extends Ratchet_Wamp
{
// トピック一覧
private $topics = array();
/**
* 切断
*
* @param \Ratchet\ConnectionInterface $conn
*/
public function onClose(\Ratchet\ConnectionInterface $conn) {
// 全てのトピックを購読解除
foreach ($this->topics as $topic)
{
$this->onUnSubscribe($conn, $topic);
}
}
/**
* 購読
*
* @param \Ratchet\ConnectionInterface $conn
* @param string|\Ratchet\Wamp\Topic $topic
*/
public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
Log::debug('********** '.__FUNCTION__.' begin **********');
Log::debug('$topic : '.$topic);
Log::debug('********** '.__FUNCTION__.' end **********');
// 不正なトピック
if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
{
return;
}
// トピック一覧にトピックを追加
if (!array_key_exists($topic->getId(), $this->topics))
{
$this->topics[$topic->getId()] = $topic;
}
}
/**
* 購読解除
*
* @param \Ratchet\ConnectionInterface $conn
* @param string|\Ratchet\Wamp\Topic $topic
*/
public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
Log::debug('********** '.__FUNCTION__.' begin **********');
Log::debug('$topic : '.$topic);
Log::debug('********** '.__FUNCTION__.' end **********');
// 不正なトピック
if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
{
return;
}
// トピックからコネクションを削除
$topic->remove($conn);
// トピックの購読者が存在しない場合、トピック一覧からトピックを削除
if ($topic->count() == 0)
{
unset($this->topics[$topic->getId()]);
}
}
/**
* RPC
*
* @param \Ratchet\ConnectionInterface $conn
* @param string $id
* @param string|\Ratchet\Wamp\Topic $fn
* @param array $params
* @return \Ratchet\Wamp\WampConnection
*/
public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) {
Log::debug('********** '.__FUNCTION__.' begin **********');
Log::debug('$id : '.$id);
Log::debug('$fn : '.$fn);
Log::debug('$params : '.print_r($params, true));
Log::debug('********** '.__FUNCTION__.' end **********');
switch ($fn) {
// 購読しているトピック一覧を取得
case 'get_subscribing_topics':
$subscribing_topics = array();
Log::debug('********** Topics begin **********');
foreach ($this->topics as $topic)
{
Log::debug('$topic : '.$topic);
Log::debug('$topic->count() : '.$topic->count());
$topic->has($conn) and $subscribing_topics[] = $topic;
}
Log::debug('********** Topics end **********');
return $conn->callResult($id, Security::htmlentities($subscribing_topics));
break;
// エラー処理
default:
$errorUri = 'errorUri';
$desc = 'desc';
$details = 'details';
/**
* \Ratchet\Wamp\WampConnection
*
* callError($id, $errorUri, $desc = '', $details = null)
*/
return $conn->callError($id, $errorUri, $desc, $details);
break;
}
}
/**
* ZeroMQ経由でコールされる
*
* @param string $msg
*/
public function zmqCallback($msg) {
Log::debug('********** '.__FUNCTION__.' begin **********');
Log::debug('$json_string : '.$msg);
Log::debug('********** '.__FUNCTION__.' end **********');
$json = json_decode($msg);
if( ! isset($json->topic) || ! isset($json->msg))
{
return;
}
foreach ($this->topics as $topic)
{
if ($json->topic == $topic)
{
// 配信
$topic->broadcast(Security::htmlentities($json->msg));
break;
}
}
}
}
/* end of file test2.php */
-- fuel/app/tasks/zmq.php --
<?php
namespace Fuel\Tasks;
class Zmq
{
public static function run()
{
// TODO:
}
/**
* ZeroMQを用いてRatchetのWampServerにpushする
*
* Note:
* http://php.zero.mq/
* http://socketo.me/docs/push#editblogsubmission
*/
public static function push($topic = null, $msg = null, $port = '5555')
{
if ($topic === null or $msg === null)
{
return;
}
$context = new \ZMQContext();
$socket = $context->getSocket(\ZMQ::SOCKET_PUSH);
$socket->connect("tcp://localhost:{$port}");
$socket->send(json_encode(array(
'topic' => $topic,
'msg' => $msg,
)));
}
}
/* End of file tasks/zmp.php */
wamp_test2.htmlにアクセスすると、以下のような画面になります。
上から順に
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。
何れかのTopicを購読(仮にtopic_1を購読したとします。)した後
php oil r zmq:push topic_1 test
とすると、"topic_1"に対して"test"というメッセージをブラウザが受信します。
以下、参考として、前回記事と今回記事の、htmlとphp(WampServer)のdiffです。
$ diff test.html test2.html
13,26d12
< <!--配信 (WampServerのonPublishメソッドが呼ばれる)-->
< <div id="publish">
< <select>
< <option value="topic_1">Topic 1</option>
< <option value="topic_2">Topic 2</option>
< <option value="topic_3">Topic 3</option>
< <option value="invalid_topic">Invalid Topic</option>
< </select>
< <input type="text" />
< <button>Publish</button>
< </div>
<
< <hr />
<
107,123d92
< // 配信 (WampServerのonPublishメソッドが呼ばれる)
< $("#publish > button").click(function() {
< var input = $("#publish > input");
< var select = $("#publish > select");
<
< console.log("-- Publish --");
< if(input.val().length == 0) {
< console.log("Input is empty.");
< } else {
< console.log("Topic: " + select.val());
< console.log("Input: " + input.val());
<
< sess.publish(select.val(), JSON.stringify({input: input.val()}));
< input.val('');
< }
< });
<
$ diff test.php test2.php
3c3
< class Ratchet_Wamp_Test extends Ratchet_Wamp
---
> class Ratchet_Wamp_Test2 extends Ratchet_Wamp
22,53d21
< * 配信
< *
< * @param \Ratchet\ConnectionInterface $conn
< * @param string|\Ratchet\Wamp\Topic $topic
< * @param string $event
< * @param array $exclude
< * @param array $eligible
< */
< public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
< Log::debug('********** '.__FUNCTION__.' begin **********');
< Log::debug('$topic : '.$topic);
< Log::debug('$event : '.$event);
< Log::debug('$exclude : '.print_r($exclude, true));
< Log::debug('$eligible : '.print_r($eligible, true));
< Log::debug('********** '.__FUNCTION__.' end **********');
<
< // 不正なトピック
< if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
< {
< return;
< }
<
< $json = json_decode($event);
<
< // トピックに対する購読者が存在する場合、配信
< if (array_key_exists($topic->getId(), $this->topics))
< {
< $topic->broadcast(Security::htmlentities($json->input));
< }
< }
<
< /**
156a125,152
> /**
> * ZeroMQ経由でコールされる
> *
> * @param string $msg
> */
> public function zmqCallback($msg) {
> Log::debug('********** '.__FUNCTION__.' begin **********');
> Log::debug('$json_string : '.$msg);
> Log::debug('********** '.__FUNCTION__.' end **********');
>
> $json = json_decode($msg);
>
> if( ! isset($json->topic) || ! isset($json->msg))
> {
> return;
> }
>
> foreach ($this->topics as $topic)
> {
> if ($json->topic == $topic)
> {
> // 配信
> $topic->broadcast(Security::htmlentities($json->msg));
> break;
> }
> }
> }
>
159c155
< /* end of file test.php */
---
> /* end of file test2.php */