Newer
Older
weather-servlet / src / main / scala / uk / org / floop / msc / wview / DataCollector.scala
package uk.org.floop.msc.wview

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.ListBuffer

import net.liftweb.http.LiftRules

import java.net.{InetSocketAddress, SocketTimeoutException, Socket}
import java.io.{IOException, DataInputStream}
import java.nio.channels.{SocketChannel, ClosedByInterruptException}
import java.nio.ByteBuffer
import java.util.Date

import uk.org.floop.msc.rrd._

object DataCollector extends Actor {
  
  def act() {
    var holdoff = 1000
    LiftRules.addUnloadHook(() => continue)
    while(true) {
      var sock: Socket = null
      try {
        var startFramePos = 0
        var readPos = 0
        var packetPos = 0
        val values = new ListBuffer[Pair[String, Any]]()
        
        sock = new Socket("10.79.0.6", 11011)
        sock.setSoTimeout(30000)
        var dis = new DataInputStream(sock.getInputStream)
        while (true) {
          while (startFramePos < 8) {
            if (dis.readByte() == LoopPacket.START_FRAME(startFramePos)) {
              startFramePos = startFramePos + 1
            } else {
              startFramePos = 0
            }
          }
          packetPos = 0
          while (packetPos < LoopPacket.fields.length) {
            LoopPacket.fields(packetPos) match {
              case (VALUE_TYPE.FLOAT, field)  => values += (field, dis.readFloat())
              case (VALUE_TYPE.USHORT, field) => values += (field, dis.readShort())
              case (VALUE_TYPE.TIME_T, field) => values += (field, dis.readInt() match {
                case 0 => None
                case x => Some(new Date(x.toLong * 1000))
              })
              case (VALUE_TYPE.SHORT, field)  => values += (field, dis.readShort())
              case (VALUE_TYPE.UCHAR, field)  => values += (field, dis.readByte())
            }
            packetPos = packetPos + 1
          }
          startFramePos = 0
          DataStore ! StorePacket(values.toList)
          values.clear()
          holdoff = 1000
        }
      } catch {
        case ex: IOException => // includes timeout
          println("DataCollector exception: " + ex)
          Thread.sleep(holdoff)
          if (holdoff < 60000) {
            holdoff = holdoff * 2
          }
      } finally {
        if ((sock != null) && (!sock.isClosed)) {
          sock.close()
        }
      }
    }
  }

}