slingit.org/system/db/connection/ConnectionAMQP.php
2015-04-01 20:52:52 +02:00

66 lines
2.0 KiB
PHP

<?php
namespace SYSTEM\DB;
class ConnectionAMQP extends ConnectionAbstr {
private $connection = NULL;
public function __construct(DBInfo $dbinfo){
$this->connection = new \AMQPConnection(
array(
'host' => $dbinfo->m_host,
'vhost' => $dbinfo->m_database,
'port' => $dbinfo->m_port,
'login' => $dbinfo->m_user,
'password' => $dbinfo->m_password
));
$this->connection->connect();
if(!$this->connection || !$this->connection->isConnected()){
throw new \SYSTEM\LOG\ERROR('Cannot connect to the amqp queue!');}
}
public function send($msg){
$channel = new \AMQPChannel($this->connection);
$exchange = new \AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('exchange2');
$exchange->setType('direct');
//$exchange->declare();
$exchange->declareExchange();
$queue = new \AMQPQueue($channel);
$queue->setName('series');
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
//$queue->declare();
$queue->declareQueue();
$queue->bind('exchange2','series');
$channel->startTransaction();
$message = $exchange->publish(json_encode($msg), 'series', AMQP_MANDATORY,
array('content_type' => 'application/json',
'delivery_mode' => 2));
$channel->commitTransaction();
if(!$message) {
throw new \SYSTEM\LOG\ERROR("Error: Message '".$message."' was not sent to queue.!");
}
}
public function __destruct(){
$this->close();
}
public function close(){
if (!$this->connection->disconnect()) {
throw new Exception("Could not disconnect !");
}
}
public function query($query){ }
public function prepare($stmtName, $stmt, $values){}
}