#!/bin/env python3 import re import time from datetime import datetime from rdflib import Graph, URIRef, Literal, ConjunctiveGraph from rdflib.namespace import Namespace, DCTERMS, RDF from rdflib.store import NO_STORE from requests import Session from urllib.parse import urljoin DCAT = Namespace('http://www.w3.org/ns/dcat#') GDP = Namespace('http://gss-data.org.uk/def/gdp#') ds = ConjunctiveGraph('Sleepycat') if ds.open('datasets.db') == NO_STORE: ds.open('datasets.db', create=True) print(f'Datasets store has {len(ds)} triples') print(f' and {len(list(ds.triples((None, RDF.type, DCAT.Dataset))))} datasets.') gss = Graph('SPARQLStore', identifier='http://gss-data.org.uk') gss.open("http://gss-data.org.uk/sparql") orgs = {org.label.value: org.org for org in gss.query( """ PREFIX org: <http://www.w3.org/ns/org#> SELECT DISTINCT ?org ?label WHERE { ?org a org:Organization ; rdfs:label ?label . }""")} gss.close() datasets_url_base = 'https://www.gov.uk/government/statistics.json' s = Session() still_going = True datasets_url = datasets_url_base abbr_re = re.compile(r'<abbr title="([^"]+)">') collection_re = re.compile(r'Part of a collection: <a href="([^"]+)">') def fetch_carefully(url): tries = 0 holdoff = 5 while tries < 10: resp = s.get(url) if resp.status_code == 200: try: return resp.json() except: pass time.sleep(holdoff) tries = tries + 1 holdoff = holdoff * 2 while still_going: datasets = fetch_carefully(datasets_url) fresh_datasets = False for res_obj in datasets['results']: res = res_obj['result'] publisher = None issued = None collection = None abbr_match = abbr_re.match(res['organisations']) if abbr_match: publisher = orgs.get(abbr_match.group(1), None) else: publisher = orgs.get(res['organisations'], None) issued = datetime.fromisoformat(res['public_timestamp']) if 'publication_collections' in res and res['publication_collections'] is not None: coll_match = collection_re.match(res['publication_collections']) if coll_match: collection = coll_match.group(1) landingPage = URIRef(urljoin(datasets_url, res['url'])) prev_publish_date = ds.value(subject=landingPage, predicate=DCTERMS.issued, any=False) if True or prev_publish_date is None or prev_publish_date.value != issued: print(f"Updating {res['title']}") fresh_datasets = True ds.set((landingPage, RDF.type, DCAT.Dataset)) if res['display_type'] == 'National Statistics': ds.add((landingPage, RDF.type, GDP.NationalStatistics)) elif res['display_type'] == 'Official Statistics': ds.add((landingPage, RDF.type, GDP.OfficialStatistics)) ds.set((landingPage, DCTERMS.title, Literal(res['title'], lang="en-gb"))) if publisher is not None: ds.set((landingPage, DCTERMS.publisher, publisher)) if issued is not None: ds.set((landingPage, DCTERMS.issued, Literal(issued))) if fresh_datasets and 'next_page_url' in datasets: datasets_url = urljoin(datasets_url, datasets['next_page_url']) still_going = True else: still_going = False print(f'Datasets store has {len(ds)} triples.') ds.close()