Akka(33): Http:Marshalling,to Json

来源:转载

  Akka-http是一项系统集成工具。这主要依赖系统之间的数据交换功能。因为程序内数据表达形式与网上传输的数据格式是不相同的,所以需要对程序高级结构化的数据进行转换(marshalling or serializing)成为可在网上传输的数据格式。由于可能涉及到异类系统集成,网上传输数据格式是一个公开的标准,这样大家才都可以进行解析。Json就是是一个目前业界普遍接受的网上交换数据格式。当然,所谓的数据格式转换应该是双向的,还需要包括把接收的网上传输数据转换成程序高级结构化数据。

Akka-http网上交换数据转换代表把一个高级结构类型T的实例转换成简单的目标类型如MessageEntity,它代表http消息中的数据部分(entity-body)。Akka-http用Marshaller[A,B]类型来进行类型A的实例到类型B实例的转换。Marshaller[A,B]定义如下:

sealed abstract class Marshaller[-A, +B] {

def apply(value: A)(implicit ec: ExecutionContext): Future[List[Marshalling[B]]]

def map[C](f: B ⇒ C): Marshaller[A, C] =

Marshaller(implicit ec ⇒ value ⇒ this(value).fast map (_ map (_ map f)))

...

}

//#marshaller-creation

object Marshaller

extends GenericMarshallers

with PredefinedToEntityMarshallers

with PredefinedToResponseMarshallers

with PredefinedToRequestMarshallers {

/**

* Creates a [[Marshaller]] from the given function.

*/

def apply[A, B](f: ExecutionContext ⇒ A ⇒ Future[List[Marshalling[B]]]): Marshaller[A, B] =

new Marshaller[A, B] {

def apply(value: A)(implicit ec: ExecutionContext) =

try f(ec)(value)

catch { case NonFatal(e) ⇒ FastFuture.failed(e) }

}

...

}

构建函数apply[A,B]包嵌了个操作函数:A=>Future[List[Marshalling[B]]],至于为什么不采用更简单直接的方式A=>B是因为:

1、转换的过程是异步non-blocking的,所以返回Future[??]

2、可能有多种转换目标数据格式如XML,Json:所以用List[??]表达

3、如果需要在产生最终目标格式数据前能获取或者修改数据的属性,就需要在数据源与目标数据之间设一个中间层结果,Marshalling[B]就是这样一个中间抽象层类型。通过Marshalling类型可以在数据实际产生之前获取或修改数据属性:

/**

* Describes one possible option for marshalling a given value.

*/

sealed trait Marshalling[+A] {

def map[B](f: A ⇒ B): Marshalling[B]

/**

* Converts this marshalling to an opaque marshalling, i.e. a marshalling result that

* does not take part in content type negotiation. The given charset is used if this

* instance is a `WithOpenCharset` marshalling.

*/

def toOpaque(charset: HttpCharset): Marshalling[A]

}

object Marshalling {

/**

* A Marshalling to a specific [[akka.http.scaladsl.model.ContentType]].

*/

final case class WithFixedContentType[A](

contentType: ContentType,

marshal: () ⇒ A) extends Marshalling[A] {

def map[B](f: A ⇒ B): WithFixedContentType[B] = copy(marshal = () ⇒ f(marshal()))

def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(marshal)

}

/**

* A Marshalling to a specific [[akka.http.scaladsl.model.MediaType]] with a flexible charset.

*/

final case class WithOpenCharset[A](

mediaType: MediaType.WithOpenCharset,

marshal: HttpCharset ⇒ A) extends Marshalling[A] {

def map[B](f: A ⇒ B): WithOpenCharset[B] = copy(marshal = cs ⇒ f(marshal(cs)))

def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(() ⇒ marshal(charset))

}

/**

* A Marshalling to an unknown MediaType and charset.

* Circumvents content negotiation.

*/

final case class Opaque[A](marshal: () ⇒ A) extends Marshalling[A] {

def map[B](f: A ⇒ B): Opaque[B] = copy(marshal = () ⇒ f(marshal()))

def toOpaque(charset: HttpCharset): Marshalling[A] = this

}

}

我们可以在Marshalling类型里对消息内容类型(message-content-type)进行操作。为了方便操作,Akka-http提供了下面这几个类型别名:

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]

type ToByteStringMarshaller[T] = Marshaller[T, ByteString]

type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]

type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]

type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

基本上是以目标数据类型来分类代表的。Akka-http提供了许多类型的预设实例到Mashalling转换:

PredefinedToEntityMarshallers

Array[Byte]

ByteString

Array[Char]

String

akka.http.scaladsl.model.FormData

akka.http.scaladsl.model.MessageEntity

T <: akka.http.scaladsl.model.Multipart

PredefinedToResponseMarshallers

T, if a ToEntityMarshaller[T] is available

HttpResponse

StatusCode

(StatusCode, T), if a ToEntityMarshaller[T] is available

(Int, T), if a ToEntityMarshaller[T] is available

(StatusCode, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available

(Int, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available

PredefinedToRequestMarshallers

HttpRequest

Uri

(HttpMethod, Uri, T), if a ToEntityMarshaller[T] is available

(HttpMethod, Uri, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available

GenericMarshallers

Marshaller[Throwable, T]

Marshaller[Option[A], B], if a Marshaller[A, B] and an EmptyValue[B] is available

Marshaller[Either[A1, A2], B], if a Marshaller[A1, B] and a Marshaller[A2, B] is available

Marshaller[Future[A], B], if a Marshaller[A, B] is available

Marshaller[Try[A], B], if a Marshaller[A, B] is available

Akka-http还提供了一个工具类Marshal:

class Marshal[A](val value: A) {

/**

* Marshals `value` using the first available [[Marshalling]] for `A` and `B` provided by the given [[Marshaller]].

* If the marshalling is flexible with regard to the used charset `UTF-8` is chosen.

*/

def to[B](implicit m: Marshaller[A, B], ec: ExecutionContext): Future[B] =

m(value).fast.map {

_.head match {

case Marshalling.WithFixedContentType(_, marshal) ⇒ marshal()

case Marshalling.WithOpenCharset(_, marshal) ⇒ marshal(HttpCharsets.`UTF-8`)

case Marshalling.Opaque(marshal) ⇒ marshal()

}

}

/**

* Marshals `value` to an `HttpResponse` for the given `HttpRequest` with full content-negotiation.

*/

def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshaller[A], ec: ExecutionContext): Future[HttpResponse] = {

import akka.http.scaladsl.marshalling.Marshal._

val ctn = ContentNegotiator(request.headers)

m(value).fast.map { marshallings ⇒

val supportedAlternatives: List[ContentNegotiator.Alternative] =

marshallings.collect {

case Marshalling.WithFixedContentType(ct, _) ⇒ ContentNegotiator.Alternative(ct)

case Marshalling.WithOpenCharset(mt, _) ⇒ ContentNegotiator.Alternative(mt)

}(collection.breakOut)

val bestMarshal = {

if (supportedAlternatives.nonEmpty) {

ctn.pickContentType(supportedAlternatives).flatMap {

case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset | _: ContentType.WithMissingCharset) ⇒

marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal }

case best @ ContentType.WithCharset(bestMT, bestCS) ⇒

marshallings collectFirst {

case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal

case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS)

}

}

} else None

} orElse {

marshallings collectFirst { case Marshalling.Opaque(marshal) ⇒ marshal }

} getOrElse {

throw UnacceptableResponseContentTypeException(supportedAlternatives.toSet)

}

bestMarshal()

}

}

}

我们可以用Marshal.to和toResponseFor(request)把Akka-http提供的预设可转换类实例转换成相关的toResponseMarshallable类实例。因为Server-Directive如complete接受一个toResponseMarshallable来构建HttpResponse:

 /**

* Completes the request using the given arguments.

*

* @group route

*/

def complete(m: ⇒ ToResponseMarshallable): StandardRoute =

StandardRoute(_.complete(m))

在另一个对象里提供了ToResponseMarshallable隐式转换:

/** Something that can later be marshalled into a response */

trait ToResponseMarshallable {

type T

def value: T

implicit def marshaller: ToResponseMarshaller[T]

def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] =

Marshal(value).toResponseFor(request)

}

object ToResponseMarshallable {

implicit def apply[A](_value: A)(implicit _marshaller: ToResponseMarshaller[A]): ToResponseMarshallable =

new ToResponseMarshallable {

type T = A

def value: T = _value

def marshaller: ToResponseMarshaller[T] = _marshaller

}

implicit val marshaller: ToResponseMarshaller[ToResponseMarshallable] =

Marshaller { implicit ec ⇒ marshallable ⇒ marshallable.marshaller(marshallable.value) }

}

只要在可视域内(implicit scope)能发现Marshaller[A,B]的隐式实例就能满足complete入参要求了。下面是一些Marshal用例:

import akka.util.ByteString

import akka.http.scaladsl.model.{HttpResponse, MessageEntity}

import akka.http.scaladsl.marshalling.Marshal

import akka.http.scaladsl.model._

object Marshalling {

val string = "Yeah"

val entityFuture = Marshal(string).to[MessageEntity]

val errorMsg = "Easy, pal!"

val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse]

val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`)))

val responseText = "Plaintext"

val respFuture = Marshal(responseText).toResponseFor(request) // with content negotiation!

val bsFuture = Marshal("oh my!").to[ByteString]

val reqFuture = Marshal("can you?").to[HttpRequest]

val resp = reqFuture.flatMap {r => Marshal("ok").toResponseFor(r)}

}

那么对于那些自定义的类型U,由于不可能有预设定对应的Marshaller[U,B],应该怎么办?如简单的case class:

 case class User(id: Int, name: String)

case class Item(id: Int, name: String, price: Double)

val john = Marshal(User(1,"John")).to[MessageEntity]

val fruit = Marshal(Item(1,"banana", 3.5)).to[MessageEntity]

val route =

get {

path("items") {

complete(fruit)

} ~

path("users") {

complete(john)

}

}

看来把User,Item类型实例转成ToResponseMarshallable是没有问题的。但是,通过隐式转换ToResponseMarshallable被转换成Marshaller[U,B],而实例化这个类型的过程即构建网上传输格式的Response消息。这个可网上传输的消息是通过Json、XML这样的数据描述语言来构建的。Akka-http通过akka-http-spray-json模块直接支持由Spray-Json实现的Json读写工具库。具体Json读写是通过RootJsonFormat[T]作为接口实现的:

/**

* A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array

* or a JSON object.

*/

trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T]

RootJsonFormat[T]代表T类型实例的Json转换。RootJsonFormat[T]的继承父辈包括:

/**

* Provides the JSON deserialization and serialization for type T.

*/

trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T]

/**

* A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object.

*/

@implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}")

trait RootJsonReader[T] extends JsonReader[T]

/**

* A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object.

*/

@implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}")

trait RootJsonWriter[T] extends JsonWriter[T]

它们又继承了具体的Json读写工具类:

/**

* Provides the JSON deserialization for type T.

*/

@implicitNotFound(msg = "Cannot find JsonReader or JsonFormat type class for ${T}")

trait JsonReader[T] {

def read(json: JsValue): T

}

object JsonReader {

implicit def func2Reader[T](f: JsValue => T): JsonReader[T] = new JsonReader[T] {

def read(json: JsValue) = f(json)

}

}

/**

* Provides the JSON serialization for type T.

*/

@implicitNotFound(msg = "Cannot find JsonWriter or JsonFormat type class for ${T}")

trait JsonWriter[T] {

def write(obj: T): JsValue

}

object JsonWriter {

implicit def func2Writer[T](f: T => JsValue): JsonWriter[T] = new JsonWriter[T] {

def write(obj: T) = f(obj)

}

}

它们提供了函数JsValue=>T到JsonReader[T]及T=>JsValue到JsonWriter直接的隐式转换。Akka-http的Json解决方案是典型的type-class模式:是一种可以即兴创建功能的类型继承模式(add-hoc polymorphism)。它的特征就是在可视域内(implicit scope)应不同功能要求提供不同的功能实现类型的隐式实例(implicit instance)。具体用例如下:

trait Formats extends SprayJsonSupport with DefaultJsonProtocol

object Converters extends Formats {

case class User(id: Int, name: String)

case class Item(id: Int, name: String, price: Double)

implicit val itemFormat = jsonFormat3(Item.apply)

implicit val userFormat = jsonFormat2(User.apply)

}

jsonFormatXX是Spray-Json提供的Json读写实现。我们把这个隐式实例置于当前可视域内即完成了与Akka-http的对接。我们来看看JsonFormat的定义:

trait ProductFormatsInstances { self: ProductFormats with StandardFormats =>

// Case classes with 1 parameters

def jsonFormat1[P1 :JF, T <: Product :ClassManifest](construct: (P1) => T): RootJsonFormat[T] = {

val Array(p1) = extractFieldNames(classManifest[T])

jsonFormat(construct, p1)

}

def jsonFormat[P1 :JF, T <: Product](construct: (P1) => T, fieldName1: String): RootJsonFormat[T] = new RootJsonFormat[T]{

def write(p: T) = {

val fields = new collection.mutable.ListBuffer[(String, JsValue)]

fields.sizeHint(1 * 2)

fields ++= productElement2Field[P1](fieldName1, p, 0)

JsObject(fields: _*)

}

def read(value: JsValue) = {

val p1V = fromField[P1](value, fieldName1)

construct(p1V)

}

}

...

}

我们看到了jsonFormat返回结果类型是RootJsonFormat[T]。如果有个case class T,通过jsonFormat可以获得read(value: JsValue)及write(p:T)这两个具体的Json读写函数。Spray-Json提供的预设了Json转换的类型包括下面各类别:

/**

* Provides all the predefined JsonFormats.

*/

trait DefaultJsonProtocol

extends BasicFormats

with StandardFormats

with CollectionFormats

with ProductFormats

with AdditionalFormats

object DefaultJsonProtocol extends DefaultJsonProtocol

例如BasicFormat:

/**

* Provides the JsonFormats for the most important Scala types.

*/

trait BasicFormats {

implicit object IntJsonFormat extends JsonFormat[Int] {

def write(x: Int) = JsNumber(x)

def read(value: JsValue) = value match {

case JsNumber(x) => x.intValue

case x => deserializationError("Expected Int as JsNumber, but got " + x)

}

}

...

}

这些类型的Json转换已经是具体的read/write操作了。在SprayJsonSupport trait里有最终的Marshaller[U,B]链接:

/**

* A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol.

*/

trait SprayJsonSupport {

...

implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] =

sprayJsValueUnmarshaller.map(jsonReader[T].read)

...

//#sprayJsonMarshallerConverter

implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =

sprayJsValueMarshaller compose writer.write

...

}

我们在上面提到过FromEntityUnmarshaller[T]和ToEntityMarshaller[T]的是Marshaller[A,B]的别名:

 type FromEntityUnmarshaller[T] = Unmarshaller[HttpEntity, T]

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]

既然Akka-http的Json实现方式是type-class模式的,那么我们就试试其它Json库的功能实现方式,如:Json4s。我们需要在build.sbt中加入下面的依赖:

 "de.heikoseeberger" %% "akka-http-json4s" % "1.19.0-M2",

"org.json4s" %% "json4s-jackson" % "3.6.0-M1",

"org.json4s" %% "json4s-ext" % "3.6.0-M1",

 akka-http-Json4s通过trait Json4sSupport提供了Json4s实现方式:

trait Json4sSupport {

...

/**

* HTTP entity => `A`

*

* @tparam A type to decode

* @return unmarshaller for `A`

*/

implicit def unmarshaller[A: Manifest](implicit serialization: Serialization,

formats: Formats): FromEntityUnmarshaller[A] = ...

/**

* `A` => HTTP entity

*

* @tparam A type to encode, must be upper bounded by `AnyRef`

* @return marshaller for any `A` value

*/

implicit def marshaller[A <: AnyRef](implicit serialization: Serialization,

formats: Formats,

shouldWritePretty: ShouldWritePretty =

ShouldWritePretty.False): ToEntityMarshaller[A] = ...

同样提供了FromEntityUnMarshaller[A]和ToEntityMarshaller[A]这两类的隐式实例。Serialization提供了Json的具体读写函数:

trait Serialization {

import java.io.{Reader, Writer}

/** Serialize to String.

*/

def write[A <: AnyRef](a: A)(implicit formats: Formats): String

...

/** Deserialize from a String.

*/

def read[A](json: String)(implicit formats: Formats, mf: Manifest[A]): A = read(StringInput(json))

...

}

Formats就是Json4s提供的所有Json转换预设类:

trait Formats extends Serializable { self: Formats =>

...

def withBigInt: Formats = copy(wWantsBigInt = true)

def withLong: Formats = copy(wWantsBigInt = false)

def withBigDecimal: Formats = copy(wWantsBigDecimal = true)

...

}

看起来我们只需在可视域内提供Serialization和Formats类型的隐式实例就行了:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport

import org.json4s.jackson

trait JsonCodec extends Json4sSupport {

import org.json4s.DefaultFormats

import org.json4s.ext.JodaTimeSerializers

implicit val serilizer = jackson.Serialization

implicit val formats = DefaultFormats ++ JodaTimeSerializers.all

}

object JsConverters extends JsonCodec

看看具体用例:

 import scala.collection.mutable._

case class User(id: Int, name: String)

class Item(id: Int, name: String, price: Double)

object AnyPic {

val area = 10

val title = "a picture"

val data = ArrayBuffer[Byte](1,2,3)

}

val john = Marshal(User(1,"John")).to[MessageEntity]

val fruit = Marshal(new Item(1,"banana", 3.5)).to[MessageEntity]

val pic = Marshal(AnyPic).to[MessageEntity]

不但省却了重复的JsonFormatXX,而且功能更加灵活强大:因为不再局限于case class这一种自定义类型了,在无需额外代码情况下class,object等全部都支持。

下面是本篇讨论示范的源代码:

build.sbt

name := "learn-http"

version := "0.1"

scalaVersion := "2.12.3"

libraryDependencies ++= Seq(

"de.heikoseeberger" %% "akka-http-json4s" % "1.19.0-M2",

"org.json4s" %% "json4s-jackson" % "3.6.0-M1",

"org.json4s" %% "json4s-ext" % "3.6.0-M1",

"com.typesafe.akka" %% "akka-http" % "10.0.10",

"com.typesafe.akka" %% "akka-actor" % "2.5.4",

"com.typesafe.akka" %% "akka-stream" % "2.5.4",

"com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10"

)

Marshalling

import akka.actor._

import akka.stream._

import akka.util.ByteString

import akka.http.scaladsl.marshalling.Marshal

import akka.http.scaladsl.model._

import akka.http.scaladsl.server.Directives._

import akka.http.scaladsl.marshallers.sprayjson._

import spray.json._

trait Formats extends SprayJsonSupport with DefaultJsonProtocol

object Converters extends Formats {

case class User(id: Int, name: String)

case class Item(id: Int, name: String, price: Double)

implicit val itemFormat = jsonFormat3(Item.apply)

implicit val userFormat = jsonFormat2(User.apply)

}

object Marshalling {

import Converters._

implicit val httpSys = ActorSystem("httpSystem")

implicit val httpMat = ActorMaterializer()

implicit val httpEC = httpSys.dispatcher

val string = "Yeah"

val entityFuture = Marshal(string).to[MessageEntity]

val errorMsg = "Easy, pal!"

val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse]

val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`)))

val responseText = "Plaintext"

val respFuture = Marshal(responseText).toResponseFor(request)

// val bsFuture = Marshal("oh my!").to[ByteString]

// val reqFuture = Marshal(400).to[HttpRequest]

// val resp = reqFuture.flatMap {r => Marshal("ok").toResponseFor(r)}

val john = Marshal(User(1,"John")).to[MessageEntity]

val fruit = Marshal(Item(1,"banana", 3.5)).to[MessageEntity]

val route =

get {

path("items") {

complete(fruit)

} ~

path("users") {

complete(john)

}

}

}

 Json4sMarshalling

import akka.actor._

import akka.stream._

import akka.http.scaladsl.marshalling.Marshal

import akka.http.scaladsl.model._

import akka.http.scaladsl.server.Directives._

import de.heikoseeberger.akkahttpjson4s.Json4sSupport

import org.json4s.jackson

trait JsonCodec extends Json4sSupport {

import org.json4s.DefaultFormats

import org.json4s.ext.JodaTimeSerializers

implicit val serilizer = jackson.Serialization

implicit val formats = DefaultFormats ++ JodaTimeSerializers.all

}

object JsConverters extends JsonCodec

object Json4sMarshalling {

import JsConverters._

implicit val httpSys = ActorSystem("httpSystem")

implicit val httpMat = ActorMaterializer()

implicit val httpEC = httpSys.dispatcher

val string = "Yeah"

val entityFuture = Marshal(string).to[MessageEntity]

val errorMsg = "Easy, pal!"

val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse]

val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`)))

val responseText = "Plaintext"

val respFuture = Marshal(responseText).toResponseFor(request)

import scala.collection.mutable._

case class User(id: Int, name: String)

class Item(id: Int, name: String, price: Double)

object AnyPic {

val area = 10

val title = "a picture"

val data = ArrayBuffer[Byte](1,2,3)

}

val john = Marshal(User(1,"John")).to[MessageEntity]

val fruit = Marshal(new Item(1,"banana", 3.5)).to[MessageEntity]

val pic = Marshal(AnyPic).to[MessageEntity]

val route =

get {

path("items") {

complete(fruit)

} ~

path("users") {

complete(john)

} ~

path("pic") {

complete(pic)

}

}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享给朋友:
您可能感兴趣的文章:
随机阅读: