transferAll();
class TransferData{
private $dataTransfered = 0;
public function transferAll(){
echo "Transfering data please waite ....
";
flush();
//order depending on foreign keys -> pls don't change !!!
// Schema -> SYSTEM:
//$this->transferApiCalls();
//$this->transferPageCalls();
$this->transferUser();
//Schema -> DATA / DATA_PROCESSED:
$this->transferDevice_Types();
$this->transferDevice_Models();
$this->transferDevices();
$this->transferDevice_Models_Specifications_Keys();
$this->transferDevice_Models_Specifications();
$this->transferSensor_Types();
$this->transferSensor_Models();
$this->transferSensors();
$this->transferSensor_Attributes_Keys();
$this->transferSensor_Attributes();
$this->transferSeries();
$this->transferValues();
//$this->transferData_Values_Cleaned(); //-> change preprocessor to Postgres
//$this->transferOSMData();
$this->transferTag_Keys();
$this->transferTags();
//Schema -> DATA_PROCESSED:
//$this->transferData_Cache_Index(); -> change cache to Postgres
echo "
Done: Transfered ".$this->dataTransfered." rows of data :)";
flush();
}
//################################# Schema -> SYSTEM ##############################
//##############################################################################################
private function transferApiCalls(){
$rowCount = 0;
$sqlString = "SELECT * FROM `APICalls`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO system.api_calls ("ID", flag, "parentID", "parentValue", name, "allowedValues") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$flag = $data['Flag'];
$parentID = $data['ParentID'];
$parentValue = isset($data['ParentValue']) ? "'".$data['ParentValue']."'" : "NULL";
$name = isset($data['Name']) ? "'".$data['Name']."'" : "NULL";
$allowedValues = isset($data['AllowedValues']) ? "'".$data['AllowedValues']."'" : "NULL";
$postgresSql.=" (".$id.", ".$flag.", ".$parentID.", ".$parentValue.", ".$name.", ".$allowedValues."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE system.api_calls");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table APICalls ['.$rowCount.' rows] (MySQL) to table system.api_calls (Postgres).
';
flush();
}
private function transferPageCalls(){
$rowCount = 0;
$sqlString = "SELECT * FROM `PAGECalls`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO system.page_calls ("ID", flag, "parentID", "parentValue", name, "allowedValues") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$flag = $data['Flag'];
$parentID = $data['ParentID'];
$parentValue = isset($data['ParentValue']) ? "'".$data['ParentValue']."'" : "NULL";
$name = isset($data['Name']) ? "'".$data['Name']."'" : "NULL";
$allowedValues = isset($data['AllowedValues']) ? "'".$data['AllowedValues']."'" : "NULL";
$postgresSql.=" (".$id.", ".$flag.", ".$parentID.", ".$parentValue.", ".$name.", ".$allowedValues."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE system.page_calls");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table PAGECalls ['.$rowCount.' rows] (MySQL) to table system.page_calls (Postgres).
';
flush();
}
private function transferLocal_String(){
// sachen in postgres nicht löschen, nur unsere Sachen aus MySQL hinzufügen
$rowCount = 0;
$sqlString = "SELECT * FROM `locale_string`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO system.locale_string ("id", category, "enUS", "deDE") VALUES';
while($data = $res->next()){
$rowCount++;
$id = isset($data['id']) ? "'".pg_escape_string($data['id'])."'" : "NULL";
$category = $data['category'];
$enUS = isset($data['enUS']) ? "'".pg_escape_string($data['enUS'])."'" : "NULL";
$deDE = isset($data['deDE']) ? "'".pg_escape_string($data['deDE'])."'" : "NULL";
$postgresSql.=" (".$id.", ".$category.", ".$enUS.", ".$deDE."),";
}
//echo $postgresSql;
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//don't delete entries in postgres
//$res = $postgres->query("TRUNCATE system.locale_string");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table locale_string ['.$rowCount.' rows] (MySQL) to table system.locale_string (Postgres).
';
flush();
}
private function transferUser(){
$rowCount = 0;
$sqlString = "SELECT * FROM `user`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasenseuser", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO system.user ("id", "username", "password_md5", "email", "joindate") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['id'];
$username = isset($data['username']) ? "'".pg_escape_string(utf8_encode($data['username']))."'" : "NULL";
$pw_md5 = isset($data['password']) ? "'".$data['password']."'" : "NULL";
$email = isset($data['email']) ? "'".$data['email']."'" : "NULL";
$joindate = 'to_timestamp('.round($data['creationTimestamp']/1000).')';
$postgresSqlARY[] = " (".$id.", ".$username.", ".$pw_md5.", ".$email.", ".$joindate.");";
}
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE system.user CASCADE");
//insert new stuff
foreach($postgresSqlARY as $val){
$res = $postgres->query($postgresSql.$val);
}
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table user ['.$rowCount.' rows] (MySQL) to table system.user (Postgres).
';
flush();
}
//################################# Schema -> DATA ##############################
//############################################################################################
private function transferDevice_Types(){
$rowCount = 0;
$sqlString = "SELECT * FROM `deviceTypes`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.device_types ("ID", type) VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$type = isset($data['type']) ? "'".$data['type']."'" : "NULL";
$postgresSql.=" (".$id.", ".$type."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.device_types CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table deviceTypes ['.$rowCount.' rows] (MySQL) to table data.device_types (Postgres).
';
flush();
}
private function transferDevice_Models(){
$rowCount = 0;
$sqlString = "SELECT * FROM `deviceModels`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.device_models ("ID", "typeID", manufactor, model) VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$typeID = $data['typeID'];
$manu = isset($data['manufactor']) ? "'".$data['manufactor']."'" : "NULL";
$model = isset($data['model']) ? "'".$data['model']."'" : "NULL";
$postgresSql.=" (".$id.", ".$typeID.", ".$manu.", ".$model."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.device_models CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table deviceModels ['.$rowCount.' rows] (MySQL) to table data.device_models (Postgres).
';
flush();
}
private function transferDevices(){
$rowCount = 0;
$sqlString = "SELECT * FROM `devices`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.devices ("ID", "name", "modelID", identifier) VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$name = isset($data['name']) ? "'".$data['name']."'" : "NULL";
$modelID = $data['modelID'];
$identifier = isset($data['identifier']) ? "'".$data['identifier']."'" : "NULL";
$postgresSql.=" (".$id.", ".$name.", ".$modelID.", ".$identifier."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.devices CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table devices ['.$rowCount.' rows] (MySQL) to table data.devices (Postgres).
';
flush();
}
private function transferDevice_Models_Specifications_Keys(){
//-> no entries in MySQL
}
private function transferDevice_Models_Specifications(){
//-> no entries in MySQL
}
private function transferSensor_Types(){
$rowCount = 0;
$sqlString = "SELECT * FROM `sensorTypes` INNER JOIN `sensorTypeBoundaries` ON sensorTypes.ID = sensorTypeBoundaries.ID;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.sensor_types ("ID", type, "measurementUnit", "maxBoundary", "minBoundary") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$type = isset($data['type']) ? "'".$data['type']."'" : "NULL";
$mesUnit = isset($data['measurementUnit']) ? "'".$data['measurementUnit']."'" : "NULL";
$max = $data['max'];
$min = $data['min'];
$postgresSql.=" (".$id.", ".$type.", ".$mesUnit.", ".$max.", ".$min."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.sensor_types CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table sensorTypes, sensorTypeBoundaries ['.$rowCount.' rows] (MySQL) to table data.sensor_types (Postgres).
';
flush();
}
private function transferSensor_Models(){
$rowCount = 0;
$sqlString = "SELECT * FROM `sensorModels`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.sensor_models ("ID", manufactor, model) VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$manu = isset($data['manufactor']) ? "'".$data['manufactor']."'" : "NULL";
$model = isset($data['model']) ? "'".$data['model']."'" : "NULL";
$postgresSql.=" (".$id.", ".$manu.", ".$model."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.sensor_models CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table sensorModels ['.$rowCount.' rows] (MySQL) to table data.sensor_models (Postgres).
';
flush();
}
private function transferSensors(){
$rowCount = 0;
$sqlString = "SELECT * FROM `sensors`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.sensors ("ID", "deviceID", "modelID", "typeID") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$deviceID = $data['deviceID'];
$typeID = $data['typeID'];
$modelID = isset($data['modelID']) ? $data['modelID'] : "NULL";
$postgresSql.=" (".$id.", ".$deviceID.", ".$modelID.", ".$typeID."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.sensors CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table sensors ['.$rowCount.' rows] (MySQL) to table data.sensors (Postgres).
';
flush();
}
private function transferSensor_Attributes_Keys(){
//-> no entries in MySQL
}
private function transferSensor_Attributes(){
//-> no entries in MySQL
}
private function transferSeries(){
$rowCount = 0;
$sqlString = "SELECT * FROM `measurementSeries`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.series ("ID", "visibility", "timestamp", "name") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$timestamp = 'to_timestamp('.round($data['timestamp']/1000).')';
$visibility = $data['visibility'];
$name = isset($data['name']) ? "'".$data['name']."'" : "NULL";
$postgresSql.=" (".$id.", ".$visibility.", ".$timestamp.", ".$name."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.series CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table measurementSeries ['.$rowCount.' rows] (MySQL) to table data.series (Postgres).
';
flush();
}
private function transferValues(){
//connections
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//get max id
$sql = "SELECT max(ID) as IDcount FROM `values`;";
$res = $mysql->query($sql);
$res = $res->next();
$IDcount = $res['IDcount'];
$transfereIncomplete = true;
$rowCount = 0;
$min = 0;
$max = 50000;
//delete old entries
$res = $postgres->query("TRUNCATE data.values CASCADE");
while($transfereIncomplete){
$sqlString = "SELECT `values`.ID as ID,
`sensors`.typeID as typeID,
`values`.value as value,
`measurementSeries`.visibility as visibility,
`values`.timestamp as timestamp,
`locations`.acc as acc,
`locations`.alt as alt,
`locations`.lat as lat,
`locations`.long as lon,
`locations`.speed as speed,
`locations`.provider as provider,
`values`.userID as userID,
`values`.seriesID as seriesID,
`values`.sensorID as sensorID
FROM `values`
INNER JOIN `locations` ON `values`.locationID = `locations`.ID
INNER JOIN `sensors` ON `values`.sensorID = `sensors`.ID
INNER JOIN `measurementSeries` ON `values`.seriesID = `measurementSeries`.ID
WHERE `values`.ID > ".$min." && `values`.ID <= ".$max.";";
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.values ("typeID", "value", "visibility", "timestamp", "alt", "acc", "speed",
"provider", "userID", "seriesID", "sensorID", "center", "ID") VALUES';
while($data = $res->next()){
$rowCount++;
$typeID = $data['typeID'];
$value = $data['value'];
$visibility = $data['visibility'];
$timestamp = 'to_timestamp('.round($data['timestamp']/1000).')';
$alt = $data['alt'];
$acc = $data['acc'];
$speed = $data['speed'];
$provider = isset($data['provider']) ? "'".$data['provider']."'" : "NULL";
$userID = $data['userID'];
$seriesID = $data['seriesID'];
$sensorID = $data['sensorID'];
$id = $data['ID'];
$lat = $data['lat'];
$long = $data['lon'];
//long / lat -> GEOMETRY object
$center = "ST_GeomFromText('POINT(".$long." ".$lat.")')";
$postgresSql.=" (".$typeID.", ".$value.", ".$visibility.", ".$timestamp." , ".$alt." , ".$acc." , ".$speed."
, ".$provider.", ".$userID.", ".$seriesID.", ".$sensorID.", ".$center.", ".$id."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
//insert new stuff
$res = $postgres->query($postgresSql);
if($max >= $IDcount){
$transfereIncomplete = false;
}else{
$min += 50000;
$max += 50000;
}
}
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table values ['.$rowCount.' rows] (MySQL) to table data.values (Postgres).
';
flush();
}
private function transferTag_Keys(){
$rowCount = 0;
$sqlString = "SELECT * FROM `tagKeys`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.tag_keys ("ID", "name") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$name = isset($data['name']) ? "'".$data['name']."'" : "NULL";
$postgresSql.=" (".$id.", ".$name."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.tag_keys CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table tagKeys ['.$rowCount.' rows] (MySQL) to table data.tag_keys (Postgres).
';
flush();
}
private function transferTags(){
$rowCount = 0;
$sqlString = "SELECT * FROM `tags`;";
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data.tags ("ID", "tagKeyID", "value", "valueID") VALUES';
while($data = $res->next()){
$rowCount++;
$id = $data['ID'];
$tagKeyID = $data['tagKeyID'];
//TODO: Fix this. We have tagKeyIDs in tags which are not in tagKeys
if($tagKeyID < 1 || $tagKeyID > 7){
continue;
}
$valueID = $data['valueID'];
$value = isset($data['value']) ? "'".$data['value']."'" : "NULL";
$postgresSql.=" (".$id.", ".$tagKeyID.", ".$value.", ".$valueID."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//delete old entries
$res = $postgres->query("TRUNCATE data.tags CASCADE");
//insert new stuff
$res = $postgres->query($postgresSql);
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table tags ['.$rowCount.' rows] (MySQL) to table data.tags (Postgres).
';
flush();
}
//################################# Schema -> DATA_PROCESSED ##############################
//###################################################################################################
private function transferData_Cache_Index(){
// -> nothing to transfer
}
private function transferOSMData(){
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
$sql = "SELECT 4 as typeID,
0 as value,
0 as visibility,
0 as timestamp,
1 as acc,
0 as alt,
`locations`.lat as lat,
`locations`.long as lon,
0 as speed,
`locations`.provider as provider,
-1 as userID,
1 as seriesID,
417 as sensorID
FROM `locations`
WHERE `provider` = 'OSMData';";
$res = $mysql->query($sql);
$rowCount = 0;
while($result = $res->next()){
$postgres->prepare( 'insertValue',
'INSERT INTO data.values ("typeID", "value", "visibility", "alt", "acc", "speed", "provider", "userID", "seriesID", "sensorID", "center", "timestamp") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, ST_MakePoint($11, $12), to_timestamp($13)) RETURNING "ID";',
array($result['typeID'], $result['value'], $result['visibility'], $result['alt'], $result['acc'], $result['speed'], $result['provider'], $result['userID'], $result['seriesID'], $result['sensorID'], $result['lon'], $result['lat'], $result['timestamp']));
$rowCount++;
}
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table values -> OSM DATA ['.$rowCount.' rows] (MySQL) to table data.values (Postgres).
';
flush();
}
private function transferData_Values_Cleaned(){
//connections
$mysql = new SYSTEM\DB\Connection(new \SYSTEM\DB\DBInfoMYS("dasensedata", "dasense", "dasenseopendataserver", "212.72.183.108"));
$postgres = new SYSTEM\DB\Connection(new DBD\dasensePostgres());
//get max id
$sql = "SELECT max(ID) as IDcount FROM `valuesCleaned`;";
$res = $mysql->query($sql);
$res = $res->next();
$IDcount = $res['IDcount'];
$transfereIncomplete = true;
$rowCount = 0;
$min = 0;
$max = 50000;
//delete old entries
$res = $postgres->query("TRUNCATE data_processed.data_values_cleaned CASCADE");
while($transfereIncomplete){
$sqlString = "SELECT `valuesCleaned`.ID as ID,
`valuesCleaned`.valuesID as valuesID,
`valuesCleaned`.typeID as typeID,
`valuesCleaned`.value as value,
`valuesCleaned`.visibility as visibility,
`valuesCleaned`.timestamp as timestamp,
`valuesCleaned`.acc as acc,
`valuesCleaned`.alt as alt,
`valuesCleaned`.lat as lat,
`valuesCleaned`.long as lon,
`valuesCleaned`.speed as speed,
`valuesCleaned`.provider as provider,
`valuesCleaned`.userID as userID,
`valuesCleaned`.seriesID as seriesID,
`valuesCleaned`.sensorID as sensorID,
`valuesCleaned`.cluster as cluster,
`valuesCleaned`.values_count as values_count
FROM `valuesCleaned`
WHERE `valuesCleaned`.ID > ".$min." && `valuesCleaned`.ID <= ".$max.";";
$res = $mysql->query($sqlString);
$postgresSql = 'INSERT INTO data_processed.data_values_cleaned ("valuesID", "typeID", "value", "visibility", "timestamp", "alt", "acc", "speed",
"provider", "userID", "seriesID", "sensorID", "center", "ID", "cluster", "values_count") VALUES';
while($data = $res->next()){
$rowCount++;
$valuesID = $data['valuesID'];
$typeID = $data['typeID'];
$value = $data['value'];
$visibility = $data['visibility'];
$timestamp = 'to_timestamp('.round($data['timestamp']/1000).')';
$alt = $data['alt'];
$acc = $data['acc'];
$speed = $data['speed'];
$provider = isset($data['provider']) ? "'".$data['provider']."'" : "NULL";
$userID = $data['userID'];
$seriesID = $data['seriesID'];
$sensorID = $data['sensorID'];
$id = $data['ID'];
$cluster = $data['cluster'];
$values_count = $data['values_count'];
$lat = $data['lat'];
$long = $data['lon'];
//long / lat -> GEOMETRY object
$center = "ST_GeomFromText('POINT(".$long." ".$lat.")')";
$postgresSql.=" (".$valuesID.", ".$typeID.", ".$value.", ".$visibility.", ".$timestamp." , ".$alt." , ".$acc." , ".$speed."
, ".$provider.", ".$userID.", ".$seriesID.", ".$sensorID.", ".$center.", ".$id.", ".$cluster.", ".$values_count."),";
}
$postgresSql = substr($postgresSql, 0, strlen($postgresSql)-1);
$postgresSql.=";";
//insert new stuff
$res = $postgres->query($postgresSql);
if($max >= $IDcount){
$transfereIncomplete = false;
}else{
$min += 50000;
$max += 50000;
}
}
$this->dataTransfered = $this->dataTransfered + $rowCount;
echo 'Transfered table values_cleaned ['.$rowCount.' rows] (MySQL) to table data_processed.data_values_cleaned (Postgres).
';
flush();
}
}
?>