Akka HTTP Client API Notes

Bullet point notes on Akka HTTP Client

  • Akka HTTP client-side functionality is provided by akka-http-core module
  • First let's cover some implications of streaming nature of Request/Response Entities
  • Need to know if coming from non-streaming Http client background

These notes are derived from Akka HTTP Client documentation


Three different types of Akka HTTP clients

  • Request-Level Client-Side API
  • Akka HTTP handles connection management
  • Recommended approach and probably the approach we will take
  • Host-Level Client-Side API
  • Akka HTTP manages a connection-pool to one specific host/port endpoint
  • Recommended when you can supply a Source[HttpRequest, _] with requests to run against a single host over pooled connections
  • Connection-Level Client-Side API
  • Full control over when HTTP connection lifecycle (open/close)


Streaming nature of Request/Response Entities

  • Akka HTTP: streaming all the way through
  • Akka Streams based back-pressure mechanisms at all layers
  • TCP layer,
  • HTTP server
  • HttpRequest
  • HttpResponse
  • HttpEntity
  • Surprising implications if you are familiar with non-streaming / not-reactive HTTP clients
  • lack of consumption of the HTTP Entity signals back-pressure to other side of connection by design
  • Allows use of back-pressure for servers/clients
  • Prevents overwhelming application
  • Avoids uneeded buffering of HttpEntity into in-memory
Warning : "Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Akka HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponse."
-- Akka Docs


HTTP Client-Side: Handle streaming HTTP Entities


Consuming HTTP Response Entity (Http Client)

  • Common use-case: consuming response entity from client
  • use entity.dataBytes Source
  • Server-side can use directives like BasicDirectives.extractDataBytes
  • Best practice: use streaming techniques to use streams as they were meant to be used
  • framing stream into incoming chunks
  • parsing them line-by-line
  • connecting flow into another destination Sink or Akka streams connector


Example using a Source from an HttpEntity

import java.io.File

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.util.ByteString

implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()

val response: HttpResponse = ???

response.entity.dataBytes
  .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256))
  .map(transformEachLine)
  .runWith(FileIO.toPath(new File("/tmp/example.out").toPath))

def transformEachLine(line: ByteString): ByteString = ???


Consuming the whole entity

  • When you want to consume the whole entity
  • Example: Let's say you expect an entire list of Avro records.
  • You can't read them one at a time.
  • Akka HTTP provides a special method toStrict(timeout)
  • Use this method to eagerly consume the entity in one go.


Consuming the whole entity

case class ExamplePerson(name: String)
def parse(line: ByteString): ExamplePerson = ??? //read avro from byte string

val response: HttpResponse = ... //get an HTTP response 


val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3 seconds)

// while API remains the same to consume dataBytes, now they're in memory already:
val transformedData: Future[ExamplePerson] =
  strictEntity flatMap { e =>
    e.dataBytes
      .runFold(ByteString.empty) { case (acc, b) => acc ++ b }
      .map(parse)
  }


Discarding HTTP Response Entity (Client)

  • When calling REST services you might not care about the response payload
  • example: you just need the response code
  • You have to consume the entity any way,
  • If not you are putting back-pressure on TCP connection
  • Use discardEntityBytes method serves to discard the entity
  • it pipes incoming bytes to Sink.ignore


Discarding response entity

val response: HttpResponse =  //call REST service 

val discarded: DiscardedEntity = response.discardEntityBytes() //equiv response.entity.dataBytes.runWith(Sink.ignore)
discarded.future.onComplete { done => println("Entity discarded completely!") }


Request-Level Client-Side API

  • Akka HTTP Client: request-level API is recommended
  • Internally uses Host-Level Client-Side API
  • Easier to use
  • Can be used with flow-based or future-based variant
Note : "The request-level API is implemented on top of a connection pool that is shared inside the actor system. A consequence of using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use the request-level API for long-running requests like long-polling GET requests. Use the Connection-Level Client-Side API or an extra pool just for the long-running connection instead." -- Akka Docs


Using Akka HTTP Client - Future-Based Variant

  • For basic HTTP client needs
  • Akka HTTP has Http().singleRequest(...) method
  • supply a HttpRequest and get a Future[HttpResponse]
  • request dispatched across the cached host connection pool for request’s effective URI
  • request must have an absolute URI or Host header


Akka HTTP Client request with a future response

import scala.concurrent.Future
import scala.util.{ Failure, Success }

object Client {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://akka.io"))

    responseFuture
      .onComplete {
        case Success(res) => println(res)
        case Failure(_)   => sys.error("something wrong")
      }
  }
}


Using Future based approach with Actor

class MyActor extends Actor
  with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher

  final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  val http = Http(context.system)

  override def preStart() = {
    http.singleRequest(HttpRequest(uri = "https://akka.io"))
      .pipeTo(self)
  }

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        log.info("Got response, body: " + body.utf8String)
      }
    case resp @ HttpResponse(code, _, _, _) =>
      log.info("Request failed, response code: " + code)
      resp.discardEntityBytes()
  }

}


要查看或添加评论,请登录

Rick H.的更多文章

社区洞察

其他会员也浏览了