php通过thrift访问HBase 二

abloz 2012-10-18
2012-10-18

作者:周海汉 日期:2012.10.18 原文地址:http://abloz.com/2012/10/18/php-thrift-access-hbase-two.html

本文实现了php通过thrift对HBase查询所有表名,查询表的所有记录,限制记录个数,对记录进行过滤,根据rowkey查询单行和多行的功能。 在HBase 0.9.4,Hadoop 1.0.3,php 5.0,thrift 0.8 apache2.1环境中测试通过。


[root@Hadoop48 html]# cat hbase_code.php

    
    <?php
    # author: zhouhh
    # date: 2012.10.17
    # common file
    #
    # Change this to match your thrift root
    $GLOBALS['ROOT'] = '/var/www/html';
    $GLOBALS['THRIFT_ROOT'] = '/var/www/html/src';
    $GLOBALS['TRIFTSVR']='hadoop46';
    $GLOBALS['TRIFTSVR_PORT']=9090;
    #require_once 'KLogger.php';
    #$log = new KLogger ( $GLOBALS['ROOT']."/mylog" , KLogger::DEBUG );
    $tableName="a_rule";
    
    require_once( $GLOBALS['THRIFT_ROOT'].'/Thrift.php' );
    
    require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php' );
    require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php' );
    require_once( $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php' );
    
    # According to the thrift documentation, compiled PHP thrift libraries should
    # reside under the THRIFT_ROOT/packages directory.  If these compiled libraries
    # are not present in this directory, move them there from gen-php/.
    require_once( $GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php' );
    
    $socket = new TSocket( $GLOBALS['TRIFTSVR'], $GLOBALS['TRIFTSVR_PORT'] );
    $socket->setSendTimeout( 10000 ); // Ten seconds (too long for production, but this is just a demo ;)
    $socket->setRecvTimeout( 20000 ); // Twenty seconds
    $transport = new TBufferedTransport( $socket );
    $protocol = new TBinaryProtocol( $transport );
    $client = new HbaseClient( $protocol );
    
    function listTable( ) {
        global $client;
        #echo( "listing tables...n" );
        $tables = $client->getTableNames();
        sort( $tables );
        printTables($tables);
        return $tables;
    
    }
    #$columnsArray = array(
    #    new ColumnDescriptor( array(
    #        'name' => 'entry:',
    #        'maxVersions' => 3
    #    ) ),
    #    new ColumnDescriptor( array(
    #        'name' => 'info:'
    #    ) )
    #);
    function createTable($tableName, $columnsArray) {
        global $client;
        echo( "creating table: {$tableName}n" );
        try {
                $client->createTable( $tableName, $columnsArray );
        } catch ( AlreadyExists $ae ) {
                echo( "WARN: {$ae->message}n" );
        }
    
    }
    function getColumnDesc($tableName) {
        global $client;
        echo( "column families in {$tableName}:n" );
        $descriptors = $client->getColumnDescriptors( $tableName );
        asort( $descriptors );
        foreach ( $descriptors as $col ) {
            echo( "column: {$col->name}, maxVer: {$col->maxVersions}n" );
        }
    
    }
    
    function getRows($tableName,$rowNames){
        global $client;
        #echo $rowNames;
        $get_arr = $client->getRows($tableName, $rowNames, null);
        if(!empty($get_arr))
        {
            #print_r($get_arr);
    
            foreach ( $get_arr as $rowresult ){
                printRow($rowresult);
            }
        }
       else
       {
            echo("get nothing of $rowNames from $tableName");
       }
    }
    
    function getRow($tableName,$rowName){
        global $client;
        echo $rowName;
        $get_arr = $client->getRow($tableName, $rowName, null);
        if(!empty($get_arr))
        {
            #print_r($get_arr);
    
            foreach ( $get_arr as $rowresult ){
                printRow($rowresult);
            }
        }
       else
       {
            echo("get nothing of $rowName from $tableName");
       }
    }
    
    function printTables($tables){
    
        foreach ( $tables as $name ) {
                echo( "t{$name}n" );
        }
    
    }
    
    function printRow( $rowresult ) {
      echo( "{$rowresult->row}n" );
      $values = $rowresult->columns;
      asort( $values );
      foreach ( $values as $k=>$v ) {
        #echo ("column=$k,");
        echo( "t{$k}=>{$v->value}," );
        echo ("ttimestamp={$v->timestamp}n");
      }
    }
    function scanTable($tableName,$startRow,$columnArray,$count=1000) {
    
        global $client;
    
        echo( "Starting scanner of $tableName...n" );
        $scanner = $client->scannerOpen( $tableName, $startRow, $columnArray, null);
        try {
            $c=0;
          while ($c < $count){           $get_arr = $client->scannerGetList($scanner,1);
              // get_arr is an array
              if($get_arr == null) break;
    
              $c+=1;
              foreach ( $get_arr as $rowresult ){
                  printRow($rowresult);
              }
          }
    
          $client->scannerClose( $scanner );
          echo( "Scanner finished of $tableNamen" );
        } catch ( NotFound $nf ) {
          $client->scannerClose( $scanner );
          echo( "Scanner finishedn" );
        }
    }
    # $filter="RowFilter(=, 'regexstring:00[1-3]00')";
    # $filter="PrefixFilter('aaa')";
    # $scan = new TScan(array("filterString" => $filter));
    function scanTableWithFilter($tableName,$filter,$count=1000) {
    
        global $client;
    
        echo( "Starting scanner of $tableName...n" );
        $scan = new TScan();
        $scan->filterString=$filter;
        $scanner = $client->scannerOpenWithScan( $tableName, $scan, null);
        try {
            $c=0;
          while ($c < $count){           $get_arr = $client->scannerGetList($scanner,1);
              // get_arr is an array
              if($get_arr == null) break;
    
              $c+=1;
              foreach ( $get_arr as $rowresult ){
                  printRow($rowresult);
              }
          }
    
          $client->scannerClose( $scanner );
          echo( "Scanner finished of $tableNamen" );
        } catch ( NotFound $nf ) {
          $client->scannerClose( $scanner );
          echo( "Scanner finishedn" );
        }
    }
    
    ?>

扫描所有表名。然后输入一个表名和返回记录条数来查询。

[root@Hadoop48 html]# cat list.php

    
    <html>
    <head>
    <title>list tables demo of zhouhh</title>
    </head>
    <body>
    <pre>
    <?php
    require_once('hbase_code.php');
    $transport->open();
    
    #list table
    listTable();
    
    $transport->close();
    
    ?>
    </pre>
    输入要扫描的表名:
    <br />
    <form action="scan.php" method="get">
    表名: <input type="text" name="tablename" value="award_1210_reserve"/><br/>
    前缀:<input type="text" name="filter" value=""/><br/>
    返回记录条数: <input type="text" name="count" value="10"/><br/>
    <input type="submit" value="提交扫描"/>
    </form>
    </body>
    </html>


[root@Hadoop48 html]# cat scan.php

    
    
    <html>
    <head>
    <title>scan demo of zhouhh</title>
    </head>
    <body>
    <pre>
    <?php
    
     
    require_once('hbase_code.php');
    if(isset($_REQUEST["tablename"]))
    {
        $t = $_REQUEST["tablename"];
    }
    else
    {
        $t=$tableName;
    }
    
    if(isset($_REQUEST["count"]))
    {
        $c = $_REQUEST["count"];
    }
    else
    {
        $c=5;
    }
    if(isset($_REQUEST["column"]))
    {
        $column=$_REQUEST["column"];
    }
    else
    {
        $column="info:";
    }
    
    $columns=explode("|", $column);
    
    if(isset($_REQUEST["filter"]))
    {
        $filter="PrefixFilter('".$_REQUEST["filter"]."')";
    }
    else
    {
        $filter="";
    }
    $transport->open();
    
    #scan table
    if(empty($filter))
    {
        scanTable($t, $row, $columns, $c);
    }
    else
    {
        scanTableWithFilter($t, $filter, $c);
    }
    
    $transport->close();
    
    ?>
    </pre>
    输入要查询的表名:
    <br />
    <form action="get.php" method="get">
    表名: <input type="text" name="tablename" value="award_1210"/><br/>
    行键(每行键占一行):<br/> <textarea rows="10" cols="100" name="startrow" value="" ></textarea><br/>
    <input type="submit" value="提交查询"/>
    </form>
    
    </body>
    </html>

输入rowkey来获取结果

[root@Hadoop48 html]# cat get.php

    
    <html>
    <head>
    <title>scan demo of zhouhh</title>
    </head>
    <body>
    <pre>
    <?php
    require_once('hbase_code.php');
    if(isset($_REQUEST["tablename"]))
    {
    
        $t = $_REQUEST["tablename"];
    }
    else
    {
        $t=$tableName;
    }
    
    if(isset($_REQUEST["count"]))
    {
        $c = $_REQUEST["count"];
    }
    else
    {
        $c=5;
    }
    
    if(isset($_REQUEST["startrow"]))
    {
        $row = $_REQUEST["startrow"];
        #echo $row;
    }
    else
    {
        $row="";
    }
    if(isset($_REQUEST["column"]))
    {
        $column=$_REQUEST["column"];
    }
    else
    {
        $column="info:";
    }
    #$columns=explode("|", $column);
    $columns= $column;
    $transport->open();
    
    #get row of table
    if(empty($row))
    {
        scanTable($t, $row, $columns, 1);
    }
    else
    {
        getRows($t,explode("x0dx0a",$row));
    }
    
    $transport->close();
    ?>
    </pre>
    </body>
    </html>



如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源