scala 2.11 - How to recover from akka.stream.io.Framing$FramingException -
on: akka-stream-experimental_2.11 1.0.
we using framing.delimiter in tcp server. when message arrives length greater maximumframelength framingexception thrown , capture onerror of actorsubscriber.
server code:
def bind(address: string, port: int, target: actorref, maxinflight: int, maxframelength: int) (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = { val sink = sink.foreach { conn: tcp.incomingconnection => val targetsubscriber = actorsubscriber[message](system.actorof(props(new targetsubscriber(target, maxinflight)))) val targetsink = flow[bytestring] .via(framing.delimiter(bytestring("\n"), maximumframelength = maxframelength, allowtruncation = true)) .map(raw ⇒ message(raw)) .to(sink(targetsubscriber)) conn.flow.to(targetsink).runwith(source(promise().future)) } val connections = tcp().bind(address, port) connections.to(sink).run() }
subscriber code:
class targetsubscriber(target: actorref, maxinflight: int) extends actorsubscriber actorlogging { private var inflight = 0 override protected def requeststrategy = new maxinflightrequeststrategy(maxinflight) { override def inflightinternally = inflight } override def receive = { case onnext(msg: message) ⇒ target ! msg inflight += 1 case onerror(t) ⇒ inflight -= 1 log.error(t, "subscriber encountered error") case targetack(_) ⇒ inflight -= 1 } }
problem: messages under max frame length not flow after exception incoming connection. killing client , re running works fine.
actorsubscriber not honor supervision
what correct way skip bad message , continue next message ?
have tried put supervision on targetflow
sink instead of whole materialiser? don't see anywhere here , believe should set on flow directly.
stil more guess science ;)
Comments
Post a Comment