package uk.org.floop.msc.wview import scala.actors.Actor import scala.actors.Actor._ import scala.collection.mutable.ListBuffer import net.liftweb.http.LiftRules import java.net.{InetSocketAddress, SocketTimeoutException, Socket} import java.io.{IOException, DataInputStream} import java.nio.channels.{SocketChannel, ClosedByInterruptException} import java.nio.ByteBuffer import java.util.Date import uk.org.floop.msc.rrd._ object DataCollector extends Actor { def act() { var holdoff = 1000 LiftRules.addUnloadHook(() => continue) while(true) { var sock: Socket = null try { var startFramePos = 0 var readPos = 0 var packetPos = 0 val values = new ListBuffer[Pair[String, Any]]() sock = new Socket("10.79.0.6", 11011) sock.setSoTimeout(30000) var dis = new DataInputStream(sock.getInputStream) while (true) { while (startFramePos < 8) { if (dis.readByte() == LoopPacket.START_FRAME(startFramePos)) { startFramePos = startFramePos + 1 } else { startFramePos = 0 } } packetPos = 0 while (packetPos < LoopPacket.fields.length) { LoopPacket.fields(packetPos) match { case (VALUE_TYPE.FLOAT, field) => values += (field, dis.readFloat()) case (VALUE_TYPE.USHORT, field) => values += (field, dis.readShort()) case (VALUE_TYPE.TIME_T, field) => values += (field, dis.readInt() match { case 0 => None case x => Some(new Date(x.toLong * 1000)) }) case (VALUE_TYPE.SHORT, field) => values += (field, dis.readShort()) case (VALUE_TYPE.UCHAR, field) => values += (field, dis.readByte()) } packetPos = packetPos + 1 } startFramePos = 0 DataStore ! StorePacket(values.toList) values.clear() holdoff = 1000 } } catch { case ex: IOException => // includes timeout println("DataCollector exception: " + ex) Thread.sleep(holdoff) if (holdoff < 60000) { holdoff = holdoff * 2 } } finally { if ((sock != null) && (!sock.isClosed)) { sock.close() } } } } }