Akka HTTP Client API Notes
Bullet point notes on Akka HTTP Client
- Akka HTTP client-side functionality is provided by akka-http-core module
- First let's cover some implications of streaming nature of Request/Response Entities
- Need to know if coming from non-streaming Http client background
These notes are derived from Akka HTTP Client documentation
Three different types of Akka HTTP clients
- Request-Level Client-Side API
- Akka HTTP handles connection management
- Recommended approach and probably the approach we will take
- Host-Level Client-Side API
- Akka HTTP manages a connection-pool to one specific host/port endpoint
- Recommended when you can supply a Source[HttpRequest, _] with requests to run against a single host over pooled connections
- Connection-Level Client-Side API
- Full control over when HTTP connection lifecycle (open/close)
Streaming nature of Request/Response Entities
- Akka HTTP: streaming all the way through
- Akka Streams based back-pressure mechanisms at all layers
- TCP layer,
- HTTP server
- HttpRequest
- HttpResponse
- HttpEntity
- Surprising implications if you are familiar with non-streaming / not-reactive HTTP clients
- lack of consumption of the HTTP Entity signals back-pressure to other side of connection by design
- Allows use of back-pressure for servers/clients
- Prevents overwhelming application
- Avoids uneeded buffering of HttpEntity into in-memory
Warning : "Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Akka HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponse."
-- Akka Docs
HTTP Client-Side: Handle streaming HTTP Entities
Consuming HTTP Response Entity (Http Client)
- Common use-case: consuming response entity from client
- use entity.dataBytes Source
- Server-side can use directives like BasicDirectives.extractDataBytes
- Best practice: use streaming techniques to use streams as they were meant to be used
- framing stream into incoming chunks
- parsing them line-by-line
- connecting flow into another destination Sink or Akka streams connector
Example using a Source from an HttpEntity
import java.io.File
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.util.ByteString
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
val response: HttpResponse = ???
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256))
.map(transformEachLine)
.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))
def transformEachLine(line: ByteString): ByteString = ???
Consuming the whole entity
- When you want to consume the whole entity
- Example: Let's say you expect an entire list of Avro records.
- You can't read them one at a time.
- Akka HTTP provides a special method toStrict(timeout)
- Use this method to eagerly consume the entity in one go.
Consuming the whole entity
case class ExamplePerson(name: String)
def parse(line: ByteString): ExamplePerson = ??? //read avro from byte string
val response: HttpResponse = ... //get an HTTP response
val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3 seconds)
// while API remains the same to consume dataBytes, now they're in memory already:
val transformedData: Future[ExamplePerson] =
strictEntity flatMap { e =>
e.dataBytes
.runFold(ByteString.empty) { case (acc, b) => acc ++ b }
.map(parse)
}
Discarding HTTP Response Entity (Client)
- When calling REST services you might not care about the response payload
- example: you just need the response code
- You have to consume the entity any way,
- If not you are putting back-pressure on TCP connection
- Use discardEntityBytes method serves to discard the entity
- it pipes incoming bytes to Sink.ignore
Discarding response entity
val response: HttpResponse = //call REST service
val discarded: DiscardedEntity = response.discardEntityBytes() //equiv response.entity.dataBytes.runWith(Sink.ignore)
discarded.future.onComplete { done => println("Entity discarded completely!") }
Request-Level Client-Side API
- Akka HTTP Client: request-level API is recommended
- Internally uses Host-Level Client-Side API
- Easier to use
- Can be used with flow-based or future-based variant
Note : "The request-level API is implemented on top of a connection pool that is shared inside the actor system. A consequence of using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use the request-level API for long-running requests like long-polling GET requests. Use the Connection-Level Client-Side API or an extra pool just for the long-running connection instead." -- Akka Docs
Using Akka HTTP Client - Future-Based Variant
- For basic HTTP client needs
- Akka HTTP has Http().singleRequest(...) method
- supply a HttpRequest and get a Future[HttpResponse]
- request dispatched across the cached host connection pool for request’s effective URI
- request must have an absolute URI or Host header
Akka HTTP Client request with a future response
import scala.concurrent.Future
import scala.util.{ Failure, Success }
object Client {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://akka.io"))
responseFuture
.onComplete {
case Success(res) => println(res)
case Failure(_) => sys.error("something wrong")
}
}
}
Using Future based approach with Actor
class MyActor extends Actor
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
override def preStart() = {
http.singleRequest(HttpRequest(uri = "https://akka.io"))
.pipeTo(self)
}
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
log.info("Got response, body: " + body.utf8String)
}
case resp @ HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
resp.discardEntityBytes()
}
}