Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Provide a sample application #14

Open
domdorn opened this issue May 6, 2015 · 11 comments
Open

Provide a sample application #14

domdorn opened this issue May 6, 2015 · 11 comments

Comments

@domdorn
Copy link

domdorn commented May 6, 2015

I'm trying to integrate this project into my project but I'm a little stuck...

Is there a simple way to make my (existing) events (case classes) be serialized ?

I added the SprayJsonSerializer to my project, added the required configuration files.. now I'm stuck on how to change my existing events so they get handled correct by this plugin.

My approach was that my Events would simply extend eventstore.EventData, but that does not work as its forbidden to extend case classes...

package com.dominikdorn.akkapersistence.messages

import java.util.UUID

import eventstore.{ContentType, Event, Content, EventData}

case class Cmd(msg: String)

case class Evt(_type: String, msg: String)  extends EventData (
    eventType = _type,
    UUID.randomUUID(),
    Content(akka.util.ByteString.fromString(msg), ContentType.Json),
    metadata = Content()
  )

Is there another way? I don't want to implement a SprayJsonSerializer for every event type I have in my system... maybe providing a implicit MyEvent -> JsValue converter?

A sample which illustrates how to really use this akka persistence plugin would greatly help.

Thanks,
Dominik

@t3hnar
Copy link
Contributor

t3hnar commented May 6, 2015

Hi @domdorn, there is an example
basically you need to provide mapping to each event.

@domdorn
Copy link
Author

domdorn commented May 7, 2015

@t3hnar as noted in the first post, I've seen the sample. However, its unreasonable to copy this whole code for every event-type received/stored by an akka-persistence actor. There needs to be an easier way for this than creating dozens of classes with basically identical contents.

Is there a real sample application somewhere available? Or is this repository/project more a proof of concept ?

@t3hnar
Copy link
Contributor

t3hnar commented May 7, 2015

This repository project is not a proof of concept, it is fully functional plugin.

Those examples are provided to show how to store events as json in eventstore and not in binary which is used by akka by default. This project is not intended to reinvent serialization/deserialization approaches used by developers.

Regarding your code snippet, I don't understand why did you extends EventData (there is nothing similar in examples) as well as don't understand why do you need to copy code for each event. All you need is to have something that can
Json => T and T => Json and pass Json to EventData as well as read it from there.

I'd recommend to start from looking on https://github.com/EventStore/EventStore.Akka.Persistence/blob/master/src/test/scala/akka/persistence/eventstore/SprayJsonSerializer.scala#L108
I hardcoded to have it as String in example, here is where your T => Json should go.
For reads: https://github.com/EventStore/EventStore.Akka.Persistence/blob/master/src/test/scala/akka/persistence/eventstore/SprayJsonSerializer.scala#L103

@t3hnar
Copy link
Contributor

t3hnar commented May 7, 2015

Also you should remember that akka wraps each your event in to PersistentRepr which then converted to json and passed to EventData

@petervdm
Copy link

petervdm commented Sep 1, 2015

I have followed the guidance to persist the events as json and have a few questions:

My event is a VehicleInitialized event. I see the PersistentRepr json representation in the eventstore browser when I click on the json link for this particular event. It looks like this:

Data
{
  "payload": "{\"regNumber\":\"123\",\"color\":\"red\"}",
  "sequenceNr": 1,
  "persistenceId": "vehicle-f06c85e9-e5da-443a-ba84-cf1f2a6c19ee",
  "deleted": false,
  "sender": "akka://seed-actor-system/user/seed-service/$i#-1797565315"
}

and the type of the event is akka.persistence.PersistentRepr.

I was hoping for the event type to be namespace.VehicleInitialized and the json to be:

{
   "regNumber": "123",
   "color": "red"
}

Using akka-persistence without this plugin does list my different event types as the event type and not PersistentRepr, but naturally, stored in binary.

Does akka-persistence require me to persist PersistenceRepr?
Surely having just my event data persisted will make querying and projections a lot easier?

@t3hnar
Copy link
Contributor

t3hnar commented Sep 1, 2015

@petervdm

Does akka-persistence require me to persist PersistenceRepr?

Yes it does.

Surely having just my event data persisted will make querying and projections a lot easier?

You can achieve this with writing PersistentRepr part to EventData.metadata and then your own payload to EventData.data

@petervdm
Copy link

petervdm commented Sep 1, 2015

Ok, that's a great idea. That is probably the best place for that akka-persistence information, or any other producer information for that matter. Keep the EventData.data clean and describe it in metadata.

@NayabSiddiqui
Copy link

Hi, I've recently started looking into using this project with my application. However , I'm unable to locate any example solutions. All the above mentioned links are resulting in 404. Is this repo not being looked after any more ?
Thanks in advabce

@pawelkaczor
Copy link
Contributor

@NayabSiddiqui checkout Akka-DDD.

It contains eventstore-akka-persistence module: https://github.com/pawelkaczor/akka-ddd/tree/master/eventstore-akka-persistence/src/main/scala/pl/newicom/eventstore

@t3hnar
Copy link
Contributor

t3hnar commented May 18, 2017

@NayabSiddiqui thanks for noticing, I've fixed these links

@NayabSiddiqui
Copy link

Thanks @pawelkaczor , @t3hnar ,
I had a look at the examples and wrote my custom serializer as below :

class EventStoreJsonSerializer(val system: ExtendedActorSystem) extends EventStoreSerializer {
  import EventStoreJsonSerializer._

  implicit val formats = DefaultFormats + SnapshotSerializer + new PersistentReprSerializer(system) + ActorRefSerializer ++ JodaTimeSerializers.all

  def identifier = Identifier

  def includeManifest = true

  def fromBinary(bytes: Array[Byte], manifestOpt: Option[Class[_]]) = {
    implicit val manifest = manifestOpt match {
      case Some(x) => Manifest.classType(x)
      case None    => Manifest.AnyRef
    }
    read(new String(bytes, UTF8))
  }

  def toBinary(o: AnyRef) = write(o).getBytes(UTF8)

  def toEvent(x: AnyRef) = x match {
    case x: PersistentRepr => EventData(
      eventType = classFor(x).getName,
      data = Content(ByteString(toBinary(x)), ContentType.Json)
    )

    case x: SnapshotEvent => EventData(
      eventType = classFor(x).getName,
      data = Content(ByteString(toBinary(x)), ContentType.Json)
    )

    case _ => sys.error(s"Cannot serialize $x, SnapshotEvent expected")
  }

  def fromEvent(event: Event, manifest: Class[_]) = {
    val clazz = Class.forName(event.data.eventType)
    val result = fromBinary(event.data.data.value.toArray, clazz)
    if (manifest.isInstance(result)) result
    else sys.error(s"Cannot deserialize event as $manifest, event: $event")
  }

  def classFor(x: AnyRef) = x match {
    case x: PersistentRepr => classOf[PersistentRepr]
    case _                 => x.getClass
  }

  object ActorRefSerializer extends Serializer[ActorRef] {
    val Clazz = classOf[ActorRef]

    def deserialize(implicit format: Formats) = {
      case (TypeInfo(Clazz, _), JString(x)) => system.provider.resolveActorRef(x)
    }

    def serialize(implicit format: Formats) = {
      case x: ActorRef => JString(x.path.toSerializationFormat)
    }
  }
}

object EventStoreJsonSerializer {
  val UTF8: Charset = Charset.forName("UTF-8")
  val Identifier: Int = ByteBuffer.wrap("json4s".getBytes(UTF8)).getInt

  object SnapshotSerializer extends Serializer[Snapshot] {
    val Clazz = classOf[Snapshot]

    def deserialize(implicit format: Formats) = {
      case (TypeInfo(Clazz, _), JObject(List(
      JField("data", JString(x)),
      JField("metadata", metadata)))) => Snapshot(x, metadata.extract[SnapshotMetadata])
    }

    def serialize(implicit format: Formats) = {
      case Snapshot(data, metadata) => JObject("data" -> JString(data.toString), "metadata" -> decompose(metadata))
    }
  }

  class PersistentReprSerializer(system: ExtendedActorSystem) extends Serializer[PersistentRepr] {
    val Clazz = classOf[PersistentRepr]

    def deserialize(implicit format: Formats) = {
      case (TypeInfo(Clazz, _), json) =>
        val x = json.extract[Mapping]

        val payload = x.manifest match {
          case "event.EmployeeRegistered" => read[EmployeeRegistered](x.payload)
          case "event.LeavesCredited" => read[LeavesCredited](x.payload)
          case "event.FullDayLeavesApplied" => read[FullDayLeavesApplied](x.payload)
          case "event.HalfDayLeavesApplied" => read[HalfDayLeavesApplied](x.payload)
        }
        PersistentRepr(
          payload = payload,
          sequenceNr = x.sequenceNr,
          persistenceId = x.persistenceId,
          manifest = x.manifest,
          writerUuid = x.writerUuid
        )
    }
    def serialize(implicit format: Formats) = {
      case x: PersistentRepr =>
        val payload = x.payload match {
          case e:EmployeeEvent => write(e)
        }
        val mapping = Mapping(
          payload = payload,
          sequenceNr = x.sequenceNr,
          persistenceId = x.persistenceId,
          manifest = x.payload.getClass.getName,
          writerUuid = x.writerUuid
        )
        decompose(mapping)
    }
  }

  case class Mapping(
                      payload:       String,
                      sequenceNr:    Long,
                      persistenceId: String,
                      manifest:      String,
                      writerUuid:    String
                    )
}

My events used are as follows :

sealed trait EmployeeEvent extends Event

case class EmployeeRegistered(firstName: String, lastName: String) extends EmployeeEvent

case class LeavesCredited(creditedLeaves: Float) extends EmployeeEvent

case class FullDayLeavesApplied(from: DateTime, to: DateTime) extends EmployeeEvent

case class HalfDayLeavesApplied(from: DateTime, to: DateTime) extends EmployeeEvent

object EmployeeRegistered {
  implicit val employeeRegisteredFormat = Json.format[EmployeeRegistered]
}

object LeavesCredited {
  implicit val leavesCreditedFormat = Json.format[LeavesCredited]
}

object FullDayLeavesApplied {
  implicit val fullDayLeavesApplied = Json.format[FullDayLeavesApplied]
}

object HalfDayLeavesApplied {
  implicit val halfDayLeavesAppliedFormat = Json.format[HalfDayLeavesApplied]
}

When I run my tests, I see that for some streams, the event-type in EventStore is namespace.EventName and sometimes it is akka.persistence.PersistentRepr. I'm unable to get my head around this. I want to store it as namespace.EventName only.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants