package uk.org.floop.msc.wview import scala.actors.Actor import scala.collection.mutable.ListBuffer import java.net.{InetSocketAddress, SocketTimeoutException} import java.io.IOException import java.nio.channels.{SocketChannel, ClosedByInterruptException} import java.nio.ByteBuffer import java.util.Date import uk.org.floop.msc.rrd._ object DataCollector extends Actor { def act() { var continue = true var holdoff = 1000 while (continue) { var sc: SocketChannel = null try { var startFramePos = 0 var readPos = 0 var packetPos = 0 val values = new ListBuffer[Pair[String, Any]]() val sa = new InetSocketAddress("10.79.0.6", 11011) sc = SocketChannel.open() sc.socket.setSoTimeout(30000) sc.connect(sa) val bb = ByteBuffer.allocate(4096) while (sc.read(bb) > 0) { holdoff = 1000 if (startFramePos < 8) { // still reading start frame while ((startFramePos < 8) && (readPos < bb.position)) { if (bb.get(readPos) == LoopPacket.START_FRAME(startFramePos)) { readPos += 1 startFramePos += 1 } else { // start frame doesn't match; rewind readPos = readPos - startFramePos + 1 startFramePos = 0 } } } if (startFramePos == 8) { while ((packetPos < LoopPacket.fields.length) && (readPos <= (bb.position - VALUE_TYPE.sizeof(LoopPacket.fields(packetPos)._1)))) { LoopPacket.fields(packetPos) match { case (VALUE_TYPE.FLOAT, field) => values += (field, bb.getFloat(readPos)) case (VALUE_TYPE.USHORT, field) => values += (field, bb.getShort(readPos)) case (VALUE_TYPE.TIME_T, field) => values += (field, bb.getInt(readPos) match { case 0 => None case x => Some(new Date(x.toLong * 1000)) }) case (VALUE_TYPE.SHORT, field) => values += (field, bb.getShort(readPos)) case (VALUE_TYPE.UCHAR, field) => values += (field, bb.get(readPos)) } readPos += VALUE_TYPE.sizeof(LoopPacket.fields(packetPos)._1) packetPos += 1 } if (packetPos == LoopPacket.fields.length) { // read entire loop packet DataStore ! StorePacket(values.toList) values.clear() packetPos = 0 startFramePos = 0 if (readPos == bb.position) { // just reset the buffer bb.clear() } else { // move remaining bytes back to start System.arraycopy(bb.array, readPos, bb.array, 0, bb.position - readPos) bb.position(bb.position - readPos) } readPos = 0 } } } println("DataCollector: empty read, restarting.") Thread.sleep(holdoff) if (holdoff < 60000) { holdoff = holdoff * 2 } } catch { case ex: ClosedByInterruptException => continue = false // some other thread interrupted us, so don't continue println("DataCollector interrupted, closing down.") case ex: IOException => // includes SocketTimeoutException // could occur in sc.open or sc.read, either way, close up but continue (re-open, etc.) println("DataCollector exception: " + ex) Thread.sleep(holdoff) if (holdoff < 60000) { holdoff = holdoff * 2 } } finally { sc.close() } } } }