Newer
Older
stats-registry / to_rdf.py
#!/bin/env python3
import re
import time
from datetime import datetime

from rdflib import Graph, URIRef, Literal, ConjunctiveGraph
from rdflib.namespace import Namespace, DCTERMS, RDF
from rdflib.store import NO_STORE
from requests import Session
from urllib.parse import urljoin

DCAT = Namespace('http://www.w3.org/ns/dcat#')
GDP = Namespace('http://gss-data.org.uk/def/gdp#')

ds = ConjunctiveGraph('Sleepycat')
if ds.open('datasets.db') == NO_STORE:
    ds.open('datasets.db', create=True)
print(f'Datasets store has {len(ds)} triples')
print(f' and {len(list(ds.triples((None, RDF.type, DCAT.Dataset))))} datasets.')

gss = Graph('SPARQLStore', identifier='http://gss-data.org.uk')
gss.open("http://gss-data.org.uk/sparql")

orgs = {org.label.value: org.org for org in gss.query(
    """
PREFIX org: <http://www.w3.org/ns/org#>
SELECT DISTINCT ?org ?label
WHERE {
  ?org a org:Organization ;
    rdfs:label ?label .
}""")}

gss.close()

datasets_url_base = 'https://www.gov.uk/government/statistics.json'

s = Session()
still_going = True
datasets_url = datasets_url_base
abbr_re = re.compile(r'<abbr title="([^"]+)">')
collection_re = re.compile(r'Part of a collection: <a href="([^"]+)">')


def fetch_carefully(url):
    tries = 0
    holdoff = 5
    while tries < 10:
        resp = s.get(url)
        if resp.status_code == 200:
            try:
                return resp.json()
            except:
                pass
        time.sleep(holdoff)
        tries = tries + 1
        holdoff = holdoff * 2


while still_going:
    datasets = fetch_carefully(datasets_url)
    fresh_datasets = False
    for res_obj in datasets['results']:
        res = res_obj['result']
        publisher = None
        issued = None
        collection = None
        abbr_match = abbr_re.match(res['organisations'])
        if abbr_match:
            publisher = orgs.get(abbr_match.group(1), None)
        else:
            publisher = orgs.get(res['organisations'], None)
        issued = datetime.fromisoformat(res['public_timestamp'])
        if 'publication_collections' in res and res['publication_collections'] is not None:
            coll_match = collection_re.match(res['publication_collections'])
            if coll_match:
                collection = coll_match.group(1)
        landingPage = URIRef(urljoin(datasets_url, res['url']))
        prev_publish_date = ds.value(subject=landingPage, predicate=DCTERMS.issued, any=False)
        if True or prev_publish_date is None or prev_publish_date.value != issued:
            print(f"Updating {res['title']}")
            fresh_datasets = True
            ds.set((landingPage, RDF.type, DCAT.Dataset))
            if res['display_type'] == 'National Statistics':
                ds.add((landingPage, RDF.type, GDP.NationalStatistics))
            elif res['display_type'] == 'Official Statistics':
                ds.add((landingPage, RDF.type, GDP.OfficialStatistics))
            ds.set((landingPage, DCTERMS.title, Literal(res['title'], lang="en-gb")))
            if publisher is not None:
                ds.set((landingPage, DCTERMS.publisher, publisher))
            if issued is not None:
                ds.set((landingPage, DCTERMS.issued, Literal(issued)))

    if fresh_datasets and 'next_page_url' in datasets:
        datasets_url = urljoin(datasets_url, datasets['next_page_url'])
        still_going = True
    else:
        still_going = False

print(f'Datasets store has {len(ds)} triples.')
ds.close()