package uk.org.floop.msc.wview import scala.actors.Actor import scala.actors.Actor._ import scala.collection.mutable.ListBuffer import net.liftweb.http.LiftRules import java.net.{InetSocketAddress, SocketTimeoutException, Socket} import java.io.{IOException, DataInputStream, InputStream} import java.nio.channels.{SocketChannel, ClosedByInterruptException} import java.nio.{ByteBuffer, ByteOrder} import java.util.Date import uk.org.floop.msc.rrd._ object DataCollector extends Actor { def act() { var holdoff = 1000 val buffer = new Array[Byte](LoopPacket.SIZE) //LiftRules.unloadHooks.append{() => continue} while(true) { var sock: Socket = null try { var startFramePos = 0 var readPos = 0 var packetPos = 0 val values = new ListBuffer[Pair[String, Any]]() sock = new Socket("10.79.0.6", 11011) sock.setSoTimeout(30000) var is = sock.getInputStream while (true) { while (startFramePos < 8) { val res = is.read if (res == -1) throw new IOException("Unexpected end of stream.") else if (res.toByte == LoopPacket.START_FRAME(startFramePos)) { startFramePos = startFramePos + 1 } else { startFramePos = 0 } } var bytesRead = 0 while (bytesRead < LoopPacket.SIZE) { val res = is.read(buffer, bytesRead, buffer.length - bytesRead) if (res == -1) throw new IOException("Unexpected end of stream.") bytesRead = bytesRead + res } val bb = ByteBuffer.wrap(buffer) bb.order(ByteOrder.BIG_ENDIAN) for ((valueType, field) <- LoopPacket.fields) { valueType match { case VALUE_TYPE.FLOAT => values. += ((field, { bb.order(ByteOrder.LITTLE_ENDIAN) val l = bb.getFloat.toLong bb.order(ByteOrder.BIG_ENDIAN) val f = ((l >> 16) & 0x7fff) + ((l & 0xffff) / 63365.0f) if (((l >> 31) & 0x1) == 0x1) { -f } else { f } })) case VALUE_TYPE.USHORT => values += ((field, bb.getShort)) case VALUE_TYPE.TIME_T => values += ((field, bb.getInt() match { case 0 => None case x => Some(new Date(x.toLong * 1000)) })) case VALUE_TYPE.SHORT => values += ((field, bb.getShort())) case VALUE_TYPE.UCHAR => values += ((field, bb.get())) } } startFramePos = 0 DataStore ! StorePacket(values.toList) values.clear() holdoff = 1000 } } catch { case ex: IOException => // includes timeout println("DataCollector exception: " + ex) Thread.sleep(holdoff) if (holdoff < 60000) { holdoff = holdoff * 2 } } finally { if ((sock != null) && (!sock.isClosed)) { sock.close() } } } } }