Newer
Older
weather-servlet / src / main / scala / uk / org / floop / msc / wview / DataCollector.scala
package uk.org.floop.msc.wview

import scala.actors.Actor
import scala.collection.mutable.ListBuffer

import java.net.{InetSocketAddress, SocketTimeoutException}
import java.io.IOException
import java.nio.channels.{SocketChannel, ClosedByInterruptException}
import java.nio.ByteBuffer
import java.util.Date

import uk.org.floop.msc.rrd._

object DataCollector extends Actor {
  
  def act() {
    var continue = true
    var holdoff = 1000
    while (continue) {
      var sc: SocketChannel = null
      try {
        var startFramePos = 0
        var readPos = 0
        var packetPos = 0
        val values = new ListBuffer[Pair[String, Any]]()
        
        val sa = new InetSocketAddress("10.79.0.6", 11011)
        sc = SocketChannel.open()
        sc.socket.setSoTimeout(30000)
        sc.connect(sa)
        val bb = ByteBuffer.allocate(4096)
    
        while (sc.read(bb) > 0) {
          holdoff = 1000
          if (startFramePos < 8) { // still reading start frame
            while ((startFramePos < 8) && (readPos < bb.position)) {
              if (bb.get(readPos) == LoopPacket.START_FRAME(startFramePos)) {
                readPos += 1
                startFramePos += 1
              } else { // start frame doesn't match; rewind
                readPos = readPos - startFramePos + 1
                startFramePos = 0
              }
            }
          }
          if (startFramePos == 8) {
            while ((packetPos < LoopPacket.fields.length) &&
                     (readPos <= (bb.position - VALUE_TYPE.sizeof(LoopPacket.fields(packetPos)._1)))) {
              LoopPacket.fields(packetPos) match {
                case (VALUE_TYPE.FLOAT, field)  => values += (field, bb.getFloat(readPos))
                case (VALUE_TYPE.USHORT, field) => values += (field, bb.getShort(readPos))
                case (VALUE_TYPE.TIME_T, field) => values += (field, bb.getInt(readPos) match {
                  case 0 => None
                  case x => Some(new Date(x.toLong * 1000))
                })
                case (VALUE_TYPE.SHORT, field)  => values += (field, bb.getShort(readPos))
                case (VALUE_TYPE.UCHAR, field)  => values += (field, bb.get(readPos))
              }
              readPos += VALUE_TYPE.sizeof(LoopPacket.fields(packetPos)._1)
              packetPos += 1
                     }
            if (packetPos == LoopPacket.fields.length) { // read entire loop packet
              DataStore ! StorePacket(values.toList)
              values.clear()
              packetPos = 0
              startFramePos = 0
              if (readPos == bb.position) { // just reset the buffer
                bb.clear()
              } else { // move remaining bytes back to start
                System.arraycopy(bb.array, readPos, bb.array, 0, bb.position - readPos)
                bb.position(bb.position - readPos)
              }
              readPos = 0
            }
          }
        }
        println("DataCollector: empty read, restarting.")
        Thread.sleep(holdoff)
        if (holdoff < 60000) {
          holdoff = holdoff * 2
        }
      } catch {
        case ex: ClosedByInterruptException =>
          continue = false // some other thread interrupted us, so don't continue
          println("DataCollector interrupted, closing down.")
        case ex: IOException => // includes SocketTimeoutException
          // could occur in sc.open or sc.read, either way, close up but continue (re-open, etc.)
          println("DataCollector exception: " + ex)
          Thread.sleep(holdoff)
          if (holdoff < 60000) {
            holdoff = holdoff * 2
          }
      } finally {
        sc.close()
      }
    }
  }

}