php通过thrift访问HBase 二

abloz 2012-10-18
abloz 2012-10-18

作者:周海汉 日期:2012.10.18 原文地址:http://abloz.com/2012/10/18/php-thrift-access-hbase-two.html

本文实现了php通过thrift对HBase查询所有表名,查询表的所有记录,限制记录个数,对记录进行过滤,根据rowkey查询单行和多行的功能。 在HBase 0.9.4,Hadoop 1.0.3,php 5.0,thrift 0.8 apache2.1环境中测试通过。

[root@Hadoop48 html]# cat hbase_code.php

<?php
# author: zhouhh
# date: 2012.10.17
# common file
#
# Change this to match your thrift root
$GLOBALS['ROOT'] = '/var/www/html';
$GLOBALS['THRIFT_ROOT'] = '/var/www/html/src';
$GLOBALS['TRIFTSVR']='hadoop46';
$GLOBALS['TRIFTSVR_PORT']=9090;
#require_once 'KLogger.php';
#$log = new KLogger ( $GLOBALS['ROOT']."/mylog" , KLogger::DEBUG );
$tableName="a_rule";

require_once( $GLOBALS['THRIFT_ROOT'].'/Thrift.php' );

require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php' );

# According to the thrift documentation, compiled PHP thrift libraries should
# reside under the THRIFT_ROOT/packages directory.  If these compiled libraries
# are not present in this directory, move them there from gen-php/.
require_once( $GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php' );

$socket = new TSocket( $GLOBALS['TRIFTSVR'], $GLOBALS['TRIFTSVR_PORT'] );
$socket->setSendTimeout( 10000 ); // Ten seconds (too long for production, but this is just a demo ;)
$socket->setRecvTimeout( 20000 ); // Twenty seconds
$transport = new TBufferedTransport( $socket );
$protocol = new TBinaryProtocol( $transport );
$client = new HbaseClient( $protocol );

function listTable( ) {
    global $client;
    #echo( "listing tables...n" );
    $tables = $client->getTableNames();
    sort( $tables );
    printTables($tables);
    return $tables;

}
#$columnsArray = array(
#    new ColumnDescriptor( array(
#        'name' => 'entry:',
#        'maxVersions' => 3
#    ) ),
#    new ColumnDescriptor( array(
#        'name' => 'info:'
#    ) )
#);
function createTable($tableName, $columnsArray) {
    global $client;
    echo( "creating table: {$tableName}n" );
    try {
            $client->createTable( $tableName, $columnsArray );
    } catch ( AlreadyExists $ae ) {
            echo( "WARN: {$ae->message}n" );
    }

}
function getColumnDesc($tableName) {
    global $client;
    echo( "column families in {$tableName}:n" );
    $descriptors = $client->getColumnDescriptors( $tableName );
    asort( $descriptors );
    foreach ( $descriptors as $col ) {
        echo( "column: {$col->name}, maxVer: {$col->maxVersions}n" );
    }

}

function getRows($tableName,$rowNames){
    global $client;
    #echo $rowNames;
    $get_arr = $client->getRows($tableName, $rowNames, null);
    if(!empty($get_arr))
    {
        #print_r($get_arr);

        foreach ( $get_arr as $rowresult ){
            printRow($rowresult);
        }
    }
   else
   {
        echo("get nothing of $rowNames from $tableName");
   }
}

function getRow($tableName,$rowName){
    global $client;
    echo $rowName;
    $get_arr = $client->getRow($tableName, $rowName, null);
    if(!empty($get_arr))
    {
        #print_r($get_arr);

        foreach ( $get_arr as $rowresult ){
            printRow($rowresult);
        }
    }
   else
   {
        echo("get nothing of $rowName from $tableName");
   }
}

function printTables($tables){

    foreach ( $tables as $name ) {
            echo( "t{$name}n" );
    }

}

function printRow( $rowresult ) {
  echo( "{$rowresult->row}n" );
  $values = $rowresult->columns;
  asort( $values );
  foreach ( $values as $k=>$v ) {
    #echo ("column=$k,");
    echo( "t{$k}=>{$v->value}," );
    echo ("ttimestamp={$v->timestamp}n");
  }
}
function scanTable($tableName,$startRow,$columnArray,$count=1000) {

    global $client;

    echo( "Starting scanner of $tableName...n" );
    $scanner = $client->scannerOpen( $tableName, $startRow, $columnArray, null);
    try {
        $c=0;
      while ($c < $count){           $get_arr = $client->scannerGetList($scanner,1);
          // get_arr is an array
          if($get_arr == null) break;

          $c+=1;
          foreach ( $get_arr as $rowresult ){
              printRow($rowresult);
          }
      }

      $client->scannerClose( $scanner );
      echo( "Scanner finished of $tableNamen" );
    } catch ( NotFound $nf ) {
      $client->scannerClose( $scanner );
      echo( "Scanner finishedn" );
    }
}
# $filter="RowFilter(=, 'regexstring:00[1-3]00')";
# $filter="PrefixFilter('aaa')";
# $scan = new TScan(array("filterString" => $filter));
function scanTableWithFilter($tableName,$filter,$count=1000) {

    global $client;

    echo( "Starting scanner of $tableName...n" );
    $scan = new TScan();
    $scan->filterString=$filter;
    $scanner = $client->scannerOpenWithScan( $tableName, $scan, null);
    try {
        $c=0;
      while ($c < $count){           $get_arr = $client->scannerGetList($scanner,1);
          // get_arr is an array
          if($get_arr == null) break;

          $c+=1;
          foreach ( $get_arr as $rowresult ){
              printRow($rowresult);
          }
      }

      $client->scannerClose( $scanner );
      echo( "Scanner finished of $tableNamen" );
    } catch ( NotFound $nf ) {
      $client->scannerClose( $scanner );
      echo( "Scanner finishedn" );
    }
}

?>

扫描所有表名。然后输入一个表名和返回记录条数来查询。 [root@Hadoop48 html]# cat list.php

<html>
<head>
<title>list tables demo of zhouhh</title>
</head>
<body>
<pre>
<?php
require_once('hbase_code.php');
$transport->open();

#list table
listTable();

$transport->close();

?>
</pre>
输入要扫描的表名:
<br />
<form action="scan.php" method="get">
表名: <input type="text" name="tablename" value="award_1210_reserve"/><br/>
前缀:<input type="text" name="filter" value=""/><br/>
返回记录条数: <input type="text" name="count" value="10"/><br/>
<input type="submit" value="提交扫描"/>
</form>
</body>
</html>

[root@Hadoop48 html]# cat scan.php

<html>
<head>
<title>scan demo of zhouhh</title>
</head>
<body>
<pre>
<?php

 
require_once('hbase_code.php');
if(isset($_REQUEST["tablename"]))
{
    $t = $_REQUEST["tablename"];
}
else
{
    $t=$tableName;
}

if(isset($_REQUEST["count"]))
{
    $c = $_REQUEST["count"];
}
else
{
    $c=5;
}
if(isset($_REQUEST["column"]))
{
    $column=$_REQUEST["column"];
}
else
{
    $column="info:";
}

$columns=explode("|", $column);

if(isset($_REQUEST["filter"]))
{
    $filter="PrefixFilter('".$_REQUEST["filter"]."')";
}
else
{
    $filter="";
}
$transport->open();

#scan table
if(empty($filter))
{
    scanTable($t, $row, $columns, $c);
}
else
{
    scanTableWithFilter($t, $filter, $c);
}

$transport->close();

?>
</pre>
输入要查询的表名:
<br />
<form action="get.php" method="get">
表名: <input type="text" name="tablename" value="award_1210"/><br/>
行键(每行键占一行):<br/> <textarea rows="10" cols="100" name="startrow" value="" ></textarea><br/>
<input type="submit" value="提交查询"/>
</form>

</body>
</html>

输入rowkey来获取结果 [root@Hadoop48 html]# cat get.php

<html>
<head>
<title>scan demo of zhouhh</title>
</head>
<body>
<pre>
<?php
require_once('hbase_code.php');
if(isset($_REQUEST["tablename"]))
{

    $t = $_REQUEST["tablename"];
}
else
{
    $t=$tableName;
}

if(isset($_REQUEST["count"]))
{
    $c = $_REQUEST["count"];
}
else
{
    $c=5;
}

if(isset($_REQUEST["startrow"]))
{
    $row = $_REQUEST["startrow"];
    #echo $row;
}
else
{
    $row="";
}
if(isset($_REQUEST["column"]))
{
    $column=$_REQUEST["column"];
}
else
{
    $column="info:";
}
#$columns=explode("|", $column);
$columns= $column;
$transport->open();

#get row of table
if(empty($row))
{
    scanTable($t, $row, $columns, 1);
}
else
{
    getRows($t,explode("x0dx0a",$row));
}

$transport->close();
?>
</pre>
</body>
</html>

如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源