Newer
Older
weather-servlet / src / main / scala / uk / org / floop / msc / wview / DataCollector.scala
package uk.org.floop.msc.wview

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.ListBuffer

import net.liftweb.http.LiftRules

import java.net.{InetSocketAddress, SocketTimeoutException, Socket}
import java.io.{IOException, DataInputStream, InputStream}
import java.nio.channels.{SocketChannel, ClosedByInterruptException}
import java.nio.{ByteBuffer, ByteOrder}
import java.util.Date

import uk.org.floop.msc.rrd._

object DataCollector extends Actor {
  
  def act() {
    var holdoff = 1000
    val buffer = new Array[Byte](LoopPacket.SIZE)
    //LiftRules.unloadHooks.append{() => continue}
    while(true) {
      var sock: Socket = null
      try {
        var startFramePos = 0
        var readPos = 0
        var packetPos = 0
        val values = new ListBuffer[Pair[String, Any]]()
        
        sock = new Socket("10.79.0.6", 11011)
        sock.setSoTimeout(30000)
        var is = sock.getInputStream
        while (true) {
          while (startFramePos < 8) {
            val res = is.read
            if (res == -1)
              throw new IOException("Unexpected end of stream.")
            else if (res.toByte == LoopPacket.START_FRAME(startFramePos)) {
              startFramePos = startFramePos + 1
            } else {
              startFramePos = 0
            }
          }
          var bytesRead = 0
          while (bytesRead < LoopPacket.SIZE) {
            val res = is.read(buffer, bytesRead, buffer.length - bytesRead)
            if (res == -1)
              throw new IOException("Unexpected end of stream.")
            bytesRead = bytesRead + res
          }
          val bb = ByteBuffer.wrap(buffer)
          bb.order(ByteOrder.BIG_ENDIAN)
          for ((valueType, field) <- LoopPacket.fields) {
            valueType match {
              case VALUE_TYPE.FLOAT  => values. += ((field, {
                  bb.order(ByteOrder.LITTLE_ENDIAN)
                  val l = bb.getFloat.toLong
                  bb.order(ByteOrder.BIG_ENDIAN)
                  val f = ((l >> 16) & 0x7fff) + ((l & 0xffff) / 63365.0f)
                  if (((l >> 31) & 0x1) == 0x1) {
                    -f
                  } else {
                    f
                  }
              }))
              case VALUE_TYPE.USHORT => values += ((field, bb.getShort))
              case VALUE_TYPE.TIME_T => values += ((field, bb.getInt() match {
                case 0 => None
                case x => Some(new Date(x.toLong * 1000))
              }))
              case VALUE_TYPE.SHORT  => values += ((field, bb.getShort()))
              case VALUE_TYPE.UCHAR  => values += ((field, bb.get()))
            }
          }
          startFramePos = 0
          DataStore ! StorePacket(values.toList)
          values.clear()
          holdoff = 1000
        }
      } catch {
        case ex: IOException => // includes timeout
          println("DataCollector exception: " + ex)
          Thread.sleep(holdoff)
          if (holdoff < 60000) {
            holdoff = holdoff * 2
          }
      } finally {
        if ((sock != null) && (!sock.isClosed)) {
          sock.close()
        }
      }
    }
  }

}