Newer
Older
weather-servlet / src / main / scala / uk / org / floop / msc / rrd / DataStore.scala
package uk.org.floop.msc.rrd

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.ListBuffer
import java.io.{File, IOException, ByteArrayOutputStream, InputStream}
import java.util.{Date, Calendar}
import java.awt.Color
import java.awt.image.BufferedImage
import javax.imageio.stream.MemoryCacheImageOutputStream
import javax.imageio.ImageIO
import _root_.org.xml.sax.InputSource
import _root_.org.rrd4j.core.{RrdDef, RrdDb}
import _root_.org.rrd4j.{DsType, ConsolFun}
import _root_.org.rrd4j.graph.{RrdGraphDef, RrdGraph, RrdGraphDefTemplate}
import net.liftweb.http.LiftRules
import net.liftweb.common.Full
import net.liftweb.common.Empty
import net.liftweb.http.CometActor


object TimePeriod extends Enumeration {
  val HOUR, DAY, WEEK, MONTH, YEAR = Value
  def last(p: Value): Array[Long] = {
    val t2 = Calendar.getInstance.getTimeInMillis / 1000
    val t1 = t2 - (p match {
      case HOUR => 60 * 60
      case DAY => 60 * 60 * 24
      case WEEK => 60 * 60 * 24 * 7
      case MONTH => 60 * 60 * 24 * 7 * 4
      case YEAR => 60 * 60 * 24 * 7 * 4 * 12
    })
    Array(t1, t2)
  }
}

case class StorePacket(p: List[Pair[String, Any]])
case class GenerateGraph(template: String, p: TimePeriod.Value)
case class ImageGenerated(bytes: Array[Byte])
case class AddWeatherListener(l: CometActor)
case class RemoveWeatherListener(l: CometActor)
case class CurrentWeather(w: List[Pair[String, Any]])
case class DumpXml()

object DataStore extends Actor {

  val STORE = new File("/usr/local/weather/weather.rrd")
  val DS_LIST: List[Triple[String, Double, Double]] = List(
	("barometer", 26, 32),
    ("stationPressure", 26, 32),
    ("altimeter", 26, 32),
	("inTemp", 32, 140),
    ("outTemp", -40, 150),
    ("inHumidity", 0, 100),
    ("outHumidity", 0, 100),
    ("windSpeed", 0, 150),
    ("windDir", 0, 360),
    ("windGust", 0, 150),
    ("windGustDir", 0, 360),
    ("rainRate", 0, 100),
    ("sampleRain", 0, 100),
    ("sampleET", Double.NaN, Double.NaN),
    ("radiation", Double.NaN, Double.NaN),
    ("UV", Double.NaN, Double.NaN),
    ("dewpoint", -105, 130),
    ("windchill", -120, 130),
    ("heatindex", -40, 135),
    // skip computed fields
    ("rxCheckPercent", 0, 100),
    // skip forecasts
    ("txBatteryStatus", Double.NaN, Double.NaN),
    ("consBatteryVoltage", Double.NaN, Double.NaN)
  )
  if (!STORE.exists) {
    val rrdDef = new RrdDef(STORE.getPath)
    rrdDef.setStartTime(new Date())
    rrdDef.setStep(15)
    DS_LIST.foreach( triple =>
      rrdDef.addDatasource(triple._1, DsType.GAUGE, 300, triple._2, triple._3)
    )
    List(ConsolFun.AVERAGE, ConsolFun.MAX, ConsolFun.MIN).foreach( fun => {
      rrdDef.addArchive(fun, 0.5, 1, 240) // 15 second consolidated, hour stored
      rrdDef.addArchive(fun, 0.5, 60, 24*4) // 15 minute consolidated, day stored
      rrdDef.addArchive(fun, 0.5, 240, 24*7) // Hour consolidated, week stored
      rrdDef.addArchive(fun, 0.5, 240*24, 28) // Day consolidated, month stored
      rrdDef.addArchive(fun, 0.5, 240*24*7, 52) // Week consolidated, year stored
    })
    val rrdb = new RrdDb(rrdDef)
    rrdb.close()
  }

  var currentWeather: List[Pair[String, Any]] = Nil
  val listeners = new ListBuffer[CometActor]
  
  def act() {
    loop {
      react {
        case AddWeatherListener(l) =>
          listeners += l
          reply(CurrentWeather(currentWeather))
        case RemoveWeatherListener(l) =>
          listeners -= l
        case DumpXml() =>
          val rrdb = new RrdDb(STORE.getPath)
          rrdb.dumpXml("/usr/local/weather/weather.xml")
        case StorePacket(p) =>
          currentWeather = p
          try {
            val rrdb = new RrdDb(STORE.getPath)
            try {
              val sample = rrdb.createSample()
              p.foreach( fieldValue => {
                if (rrdb.containsDs(fieldValue._1)) {
                  sample.setValue(fieldValue._1, fieldValue._2 match {
                    case n: Number => n.doubleValue
                    case _ => Double.NaN
                  })
                }
              })
              sample.update()
              //val fetchRequest = rrdb.createFetchRequest(ConsolFun.AVERAGE, )
            } catch {
              case ex: IOException => println("StorePacket, IOException: " + ex.toString)
              case ex: IllegalArgumentException => println("StorePacket, IllegalArgumentException: " + ex.toString)
            } finally {
              rrdb.close()
            }
          } catch {
            case ex: IOException => println("StorePacket, IOException: " + ex.toString)
          }
          updateListeners
        case GenerateGraph(t, p) =>
          val gd = LiftRules.doWithResource(t) { inputStream =>
              val templ = new RrdGraphDefTemplate(new InputSource(inputStream))
              templ.setVariable("rrdFile", STORE.getPath)
              templ.getRrdGraphDef()
          }
          gd match {
            case Full(gd) =>
              gd.setTimeSpan(TimePeriod.last(p))
              gd.setFilename("-") // in memory only
              gd.setImageFormat("PNG")
              gd.setAntiAliasing(true)
              gd.setTextAntiAliasing(true)
            reply(
              try {
                val g = new RrdGraph(gd)
                val gi = g.getRrdGraphInfo()
                val bi = new BufferedImage(gi.getWidth, gi.getHeight, BufferedImage.TYPE_INT_ARGB)
                g.render(bi.getGraphics)
                val ba = new ByteArrayOutputStream()
                ImageIO.write(bi, "PNG", new MemoryCacheImageOutputStream(ba))
                Some(ImageGenerated(ba.toByteArray))
              } catch {
                case ex: Exception =>
                  println(ex)
                  None
              }
            )
          case Empty =>
            reply(None)
          }
      }
    }
  }
  
  private def updateListeners {
    listeners.foreach( _ ! CurrentWeather(currentWeather) )
  }
  
  start()
  
}