Why I prefer to use PHP's stream wrapper over cURL

Don't get me wrong cURL that is available to us PHP developers through curl extension is a solid foundation that you can almost do everything with it. What it fails to do has nothing to do with cURL or libcurl the problem is in our beloved language and I will show you that how these problems are solved or alleviated when one decides to use PHP streams. Also when such renowned libraries such as Guzzle has historically preferred using cURL to send HTTP requests you can be sure that cURL is as solid as you need it to be for production.

Anyway, let's say we are asked to develop a crawler. This crawler should be able to receive a list of URLs to crawl and an xpath query to search the content for a specific piece of information and report the result in the form of an associative array using these URLs as the keys and the result of the query as the value. Also if the crawler fails to download the content of a URL it should mark the incident by reporting false for that URL and if the xpath query does not get evaluated successfully or produced an empty result null should be reported.

The first step is to decide on the signature of our function and since we are such responsible developers we decided to use generators to save some memory

/**
 * @param Generator $urlGenerator
 * @param string $query
 * @return array
 */
function fetch(Generator $urlGenerator, string $query): array
{
}

here is a sample generator that can provide URLs which share the same format

/**
 * @param string $format
 * @param string ...$values
 * @return Generator
 */
function myGenerator(string $format, string ...$values): Generator
{
    foreach ($values as $value) {
        yield sprintf($format, $value);
    }
}

and to give you an idea this generator can be used as follows:

myGenerator('https://www.w3schools.com/html/html_%s.asp', 'intro', 'editors',  'basic', 'elements');

It is a simple yet powerful idea imagine how you can write a generator function which yields a series of sequential URLs, for example from page 90 to page 250.

Now it is time to implement our "fetch" function and since the first implementation is pretty straight forward I will not waste your time describing the code.

/**
 * @param Generator $urlGenerator
 * @param string $query
 * @return array
 */
function fetch(Generator $urlGenerator, string $query): array
{
    libxml_use_internal_errors(true);
    $context = stream_context_create([
        'http' => [
            'method' => 'GET',
            'header' => "Accept: text/html\r\n"
                . "User-Agent: user-agent: curl/7.64.0\r\n",
        ]
    ]);
    $result = [];
    /** @var string $url */
    foreach ($urlGenerator as $url) {
        if (!is_string($url)) {
            throw new InvalidArgumentException('Generator should yield strings');
        }
        $connection = @fopen($url, 'r', false, $context);
        if ($connection === false){
            $result[$url] = false;
            continue;
        }

        $html = stream_get_contents($connection);
        fclose($connection);
        $doc = new DOMDocument();
        $doc->loadHTML($html);
        $xpath = new DOMXPath($doc);
        $queryResult = $xpath->evaluate($query);
        $result[$url] = $queryResult === false
            ? null
            : (is_scalar($queryResult)
                ? $queryResult
                : ($queryResult->length === 0
                    ? null
                    : $queryResult[0]->nodeValue
                )
            );
    }
    return $result;
}

If we want to stop here we can remove all the codes that use streams and replace them with their cURL counterparts (i.e. curl_* functions such as curl_init, curl_setopt/curl_setopt_array, curl_exec and curl_close). But since my aim was to show you how powerful and liberating working with streams can be let's update our requirement list.

Modify the signature of the function to accept a third parameter which is an integer that determines the maximum number of the concurrent requests / open connections the function can have.

Concurrency in PHP seems odd we do not have promises/futures or threads built into our language and our access to I/O is almost always blocking. There are extensions that more or less provide such functionalities, however if our single function requires its users to install those extensions, it would have little chance to become popular among developers. To clarify the requirement, we do not need to write a function that can be called asynchronously However, it should be able to load URLs concurrently to shorten the time required to perform its task if it is asked to do so.

This is where most of PHP functions that do not use stream wrappers to work with I/O or have abstracted access to streams fail to perform. Let's see how we can modify our code to load URL contents asynchronously and since the end function will be a little bloated I will walk you through it section by section to give you a good understanding about its behavior.

First let's modify the signature of the function as follows:

/**
 * @param Generator $urlGenerator
 * @param string $query
 * @param int $maximumConcurrentConnections
 * @return array
 */
function advancedFetch(Generator $urlGenerator, string $query, int $maximumConcurrentConnections = 1): array
{
    libxml_use_internal_errors(true);
    $result = [];

    // the logic will be implemented soon

    return $result;
}

As I was preparing this article I perceived that I can not use "fopen" anymore since it blocks until the first byte is ready to be read so I had to use another function which gave me more control over the flow of the process so I decided to use stream_socket_client but before that let's declare some variables. the names are self explanatory and I will be using URLs as the keys for the arrays so I know the status of my calls.

$activeConnections = 0;
$connections = [];
$readBuffer = [];
$writeBuffer = [];

Since we will no longer use a "foreach" loop we have to manually rewind the generator and come up with our loop. As you can see the loop continues until it makes sure that all the values from the generator is consumed (i.e. the generator is closed) and there is no active connection.

$urlGenerator->rewind();
do {

    // The rest of the logic will be implemented here

} while ($urlGenerator->valid() || $activeConnections > 0);

We use "stream_socket_client" to open a connection for each URL but it is a little tricky since the way we open a connection for "http" (i.e. tcp://<address>:<port>) is different from "https" (i.e. ssl://<address>:<port>) so we use "preg_match" to determine the protocol, the address, the port - if exists otherwise we will fallback to the default ports which are 80 and 443 - and the actual URI. If the connection is successfully established we make sure that it won't block using "stream_set_blocking" function and we will add it to the list of open connections and increase the number of open connections. Also we create an entry for the URL in the write buffer which is the HTTP message that should be written to this connection further in the process. As you can see while we have not exceeded the limit and there are more URLs that should be crawled we keep trying to create connections and load more URLs concurrently.

while ($urlGenerator->valid() && $activeConnections < $maximumConcurrentConnections) {
            $url = $urlGenerator->current();
            $urlGenerator->next();
            if (preg_match('/^(https?):\\/\\/([^\\/:]+)(|:\d+)(|\\/.*)$/', $url, $matches) === false) {
                $result[$url] = false;
                continue;
            }
            $port = empty($matches[3]) ? ($matches[1] === 'https' ? 443: 80) : substr($matches[3], 1);
            $remoteSocket = sprintf('%s://%s:%d', $matches[1] === 'https' ? 'ssl' : 'tcp', $matches[2], $port);
            $connection = @stream_socket_client($remoteSocket, $errorno, $errorstr, 5);
            if ($connection === false) {
                $result[$url] = false;
                continue;
            }
            stream_set_blocking($connection, false);
            $connections[$url] = $connection;
            $activeConnections += 1;
            $writeBuffer[$url] = "GET {$matches[4]} HTTP/1.0\r\n"
                . "Host: {$matches[2]}\r\n"
                . "User-Agent: curl/7.64.0\r\n"
                . "Accept: */*\r\n\r\n";
        
}

Now that we have reached the maximum number of allowed concurrent connections it's time we started writing to and reading from them. The connections that we are interested to read from is obvious which are all the connections that are open. In order to obtain the list of connections that we need to write to we have to check our write buffer. Using the keys there we can create a subset of connections from connections array that we are willing to write to.

$tempReadConnections = $connections;
$tempWriteConnections = empty($writeBuffer)? null: array_intersect_key($connections, $writeBuffer);
$null = null;
if (stream_select($tempReadConnections, $tempWriteConnections, $null, 5) === false) {
    continue;

}

First we do the writes and then we proceed with the reads. As you can see we have to make sure that "$tempWriteConnections" is an array as in the previous section we might have assigned null to it to indicate that we were not at a status that we needed to write to any of our connections. Since "stream_select" modifies the arrays that it receives we can not rely on the order of the members or the keys to associate a connection with a specific URL so we have to use "array_search" function to check each connection against our connections array to find the URL they are associated with. Now that we know which URL the connection belongs to we can retrieve its message from our write buffer and write to it.

if (!is_null($tempWriteConnections)) {
    foreach ($tempWriteConnections as $c) {
        $url = array_search($c, $connections);
        $message = $writeBuffer[$url];
        $bytesWrote = fwrite($c, $message, 1024);
        if ($bytesWrote === false) {
            continue;
        }
        $remainingBytes = substr($message, $bytesWrote);
        if (empty($remainingBytes)) {
            unset($writeBuffer[$url]);
        } else {
            $writeBuffer[$url] = $remainingBytes;
        }
    }

}

The read procedure is almost the same. If a connection is not reached to its end we read from it and append it to the designated buffer in our read buffer. If "feof" returns true for a connection it means that we've read all the data from it and we can continue with processing the html document using the xpath query we've received.

foreach ($tempReadConnections as $c) {
    $url = array_search($c, $connections);
    if (!feof($c)) {
        $readBuffer[$url] = (isset($readBuffer[$url]) ? $readBuffer[$url] : '') . fread($c, 1024);
        continue;
    }
    fclose($c);
    $activeConnections -= 1;
    $html = $readBuffer[$url];
    unset($connections[$url], $readBuffer[$url]);
    $doc = new DOMDocument();
    $doc->loadHTML($html);
    $xpath = new DOMXPath($doc);
    $queryResult = $xpath->evaluate($query);
    $result[$url] = $queryResult === false
        ? null
        : (is_scalar($queryResult)
            ? $queryResult
            : ($queryResult->length === 0
                ? null
                : $queryResult[0]->nodeValue
            )
        );

}

Here is the whole function

/**
 * @param Generator $urlGenerator
 * @param string $query
 * @param int $maximumConcurrentConnections
 * @return array
 */
function advancedFetch(Generator $urlGenerator, string $query, int $maximumConcurrentConnections = 1): array
{
    libxml_use_internal_errors(true);
    $result = [];
    $activeConnections = 0;
    $connections = [];
    $readBuffer = [];
    $writeBuffer = [];
    $urlGenerator->rewind();
    do {
        while ($urlGenerator->valid() && $activeConnections < $maximumConcurrentConnections) {
            $url = $urlGenerator->current();
            $urlGenerator->next();
            if (preg_match('/^(https?):\\/\\/([^\\/:]+)(|:\d+)(|\\/.*)$/', $url, $matches) === false) {
                $result[$url] = false;
                continue;
            }
            $port = empty($matches[3]) ? ($matches[1] === 'https' ? 443 : 80) : substr($matches[3], 1);
            $remoteSocket = sprintf('%s://%s:%d', $matches[1] === 'https' ? 'ssl' : 'tcp', $matches[2], $port);
            $connection = @stream_socket_client($remoteSocket, $errorno, $errorstr, 5);
            if ($connection === false) {
                $result[$url] = false;
                continue;
            }
            stream_set_blocking($connection, false);
            $connections[$url] = $connection;
            $activeConnections += 1;
            $writeBuffer[$url] = "GET {$matches[4]} HTTP/1.0\r\n"
                . "Host: {$matches[2]}\r\n"
                . "User-Agent: curl/7.64.0\r\n"
                . "Accept: */*\r\n\r\n";
        }

        $tempReadConnections = $connections;
        $tempWriteConnections = empty($writeBuffer) ? null : array_intersect_key($connections, $writeBuffer);
        $null = null;
        if (stream_select($tempReadConnections, $tempWriteConnections, $null, 5) === false) {
            continue;
        }
        if (!is_null($tempWriteConnections)) {
            foreach ($tempWriteConnections as $c) {
                $url = array_search($c, $connections);
                $message = $writeBuffer[$url];
                $bytesWrote = fwrite($c, $message, 1024);
                if ($bytesWrote === false) {
                    continue;
                }
                $remainingBytes = substr($message, $bytesWrote);
                if (empty($remainingBytes)) {
                    unset($writeBuffer[$url]);
                } else {
                    $writeBuffer[$url] = $remainingBytes;
                }
            }
        }
        foreach ($tempReadConnections as $c) {
            $url = array_search($c, $connections);
            if (!feof($c)) {
                $readBuffer[$url] = (isset($readBuffer[$url]) ? $readBuffer[$url] : '') . fread($c, 1024);
                continue;
            }
            fclose($c);
            $activeConnections -= 1;
            $html = $readBuffer[$url];
            unset($connections[$url], $readBuffer[$url]);
            $doc = new DOMDocument();
            $doc->loadHTML($html);
            $xpath = new DOMXPath($doc);
            $queryResult = $xpath->evaluate($query);
            $result[$url] = $queryResult === false
                ? null
                : (is_scalar($queryResult)
                    ? $queryResult
                    : ($queryResult->length === 0
                        ? null
                        : $queryResult[0]->nodeValue
                    )
                );
        }
    } while ($urlGenerator->valid() || $activeConnections > 0);
    return $result;

}

Addendum

Here is a mock script that you can use it as the URL that you are intended to crawl to test this function:

<?php
#mock.php
$time = time();
sleep(2); // To simulate the delay
?>
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Page <?php echo $_GET['page'] ?? 0 ?>-The time was <?php echo $time ?> and responded
        at <?php echo time() ?></title>
</head>
<body>
<div><?php echo str_pad('', mt_rand(5000, 6000),  'Test '); ?></div>
</body>
</html>

So a good generator function for this test can be this:

/**
 * @param string $format
 * @param string ...$values
 * @return Generator
 */
function mySecondGenerator(string $format, int $from, int $to): Generator
{
    for ($i = $from; $i <= $to; $i++) {
        yield sprintf($format, $i);
    }

}

And you can run the test as follows:

print_r(advancedFetch(mySecondGenerator('https://localhost/mock.php?page=%d', 10, 100), '//title[text()]', 10));

要查看或添加评论,请登录

Ehsan Enami的更多文章

  • How to tame your rebellious child process

    How to tame your rebellious child process

    Today I wanted to write some unit-tests to make sure that one of our newly developed libraries is working as expected…

  • Non-blocking MongoDB Interaction in Swoole

    Non-blocking MongoDB Interaction in Swoole

    While researching on how to setup a containerized auto-discovering microservice environment using Docker I came across…

  • When YouTube suggestion hits the bull's-eye

    When YouTube suggestion hits the bull's-eye

    To all my fellow PHP developer who might not be as lucky as I was to be suggested this talk by YouTube

  • Now it's fun and efficient using Vagrant

    Now it's fun and efficient using Vagrant

    If you are blessed to be a PHP developer – as I used to be until recently - and have never been concerned about the…

  • JavaScript The Right Way

    JavaScript The Right Way

    So you think that you are an adept front-end developer and know your way around JavaScript check this guy's channel…

社区洞察

其他会员也浏览了