scala 2.11 - How to recover from akka.stream.io.Framing$FramingException -


on: akka-stream-experimental_2.11 1.0.

we using framing.delimiter in tcp server. when message arrives length greater maximumframelength framingexception thrown , capture onerror of actorsubscriber.

server code:

def bind(address: string, port: int, target: actorref, maxinflight: int, maxframelength: int)     (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = {     val sink = sink.foreach {       conn: tcp.incomingconnection =>         val targetsubscriber = actorsubscriber[message](system.actorof(props(new targetsubscriber(target, maxinflight))))          val targetsink = flow[bytestring]           .via(framing.delimiter(bytestring("\n"), maximumframelength = maxframelength, allowtruncation = true))           .map(raw ⇒ message(raw))           .to(sink(targetsubscriber))          conn.flow.to(targetsink).runwith(source(promise().future))     }     val connections = tcp().bind(address, port)     connections.to(sink).run()   } 

subscriber code:

class targetsubscriber(target: actorref, maxinflight: int) extends actorsubscriber actorlogging {   private var inflight = 0    override protected def requeststrategy = new maxinflightrequeststrategy(maxinflight) {     override def inflightinternally = inflight   }    override def receive = {     case onnext(msg: message) ⇒       target ! msg       inflight += 1     case onerror(t) ⇒       inflight -= 1       log.error(t, "subscriber encountered error")     case targetack(_) ⇒       inflight -= 1   } } 

problem: messages under max frame length not flow after exception incoming connection. killing client , re running works fine.

actorsubscriber not honor supervision

what correct way skip bad message , continue next message ?

have tried put supervision on targetflow sink instead of whole materialiser? don't see anywhere here , believe should set on flow directly.

stil more guess science ;)


Comments

Popular posts from this blog

python - pip install -U PySide error -

arrays - C++ error: a brace-enclosed initializer is not allowed here before ‘{’ token -

apache - setting document root in antoher partition on ubuntu -