diff --git a/src/uk/org/floop/jenkins_pmd/Drafter.groovy b/src/uk/org/floop/jenkins_pmd/Drafter.groovy index 0eef84b..007fe50 100644 --- a/src/uk/org/floop/jenkins_pmd/Drafter.groovy +++ b/src/uk/org/floop/jenkins_pmd/Drafter.groovy @@ -1,6 +1,7 @@ package uk.org.floop.jenkins_pmd import groovy.json.JsonSlurper +import groovy.transform.InheritConstructors import hudson.FilePath import org.apache.http.HttpHost import org.apache.http.HttpResponse @@ -71,46 +72,88 @@ def createDraftset(String label) { String displayName = URLEncoder.encode(label, "UTF-8") String path = "/v1/draftsets?display-name=${displayName}" - HttpResponse response = getExec().execute( - Request.Post(apiBase.resolve(path)) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA) - ).returnResponse() - if (response.getStatusLine().statusCode == 200) { - return new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) - } else { - throw new DrafterException("Problem creating draftset ${errorMsg(response)}") + int retries = 5 + while (retries > 0) { + HttpResponse response = getExec().execute( + Request.Post(apiBase.resolve(path)) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA) + ).returnResponse() + if (response.getStatusLine().statusCode == 200) { + return new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + } else if (response.getStatusLine().statusCode == 503) { + waitForLock() + } else { + throw new DrafterException("Problem creating draftset ${errorMsg(response)}") + } + retries = retries - 1 + } + throw new DrafterException("Problem creating draftset, maximum retries reached while waiting for lock.") + } + + def waitForLock() { + Boolean waiting = true + int holdOffTime = 5 + while(waiting) { + sleep(holdOffTime * 1000) + HttpResponse response = getExec().execute( + Request.Get(apiBase.resolve('/status/writes-locked')) + .addHeader('Accept', 'application/json') + .userAgent(PMDConfig.UA) + ).returnResponse() + if (response.getStatusLine().statusCode == 200) { + waiting = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + } else { + throw new DrafterException("Problem waiting for write-lock ${errorMsg(response)}") + } + if (waiting && (holdOffTime < 60)) { + holdOffTime = holdOffTime * 2 + } } } def deleteGraph(String draftsetId, String graph) { String encGraph = URLEncoder.encode(graph, "UTF-8") String path = "/v1/draftset/${draftsetId}/graph?graph=${encGraph}&silent=true" - HttpResponse response = getExec().execute( - Request.Delete(apiBase.resolve(path)) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA) - ).returnResponse() - if (response.getStatusLine().statusCode == 200) { - return new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) - } else { - throw new DrafterException("Problem deleting graph ${errorMsg(response)}") + int retries = 5 + while (retries > 0) { + HttpResponse response = getExec().execute( + Request.Delete(apiBase.resolve(path)) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA) + ).returnResponse() + if (response.getStatusLine().statusCode == 200) { + return new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + } else if (response.getStatusLine().statusCode == 503) { + waitForLock() + } else { + throw new DrafterException("Problem deleting graph ${errorMsg(response)}") + } + retries = retries - 1 } + throw new DrafterException("Problem deleting graph, maximum retries reached while waiting for lock.") } def deleteDraftset(String draftsetId) { String path = "/v1/draftset/${draftsetId}" - HttpResponse response = getExec().execute( - Request.Delete(apiBase.resolve(path)) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA) - ).returnResponse() - if (response.getStatusLine().statusCode == 202) { - def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) - waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) - } else { - throw new DrafterException("Problem deleting draftset ${errorMsg(response)}") + int retries = 5 + while (retries > 0) { + HttpResponse response = getExec().execute( + Request.Delete(apiBase.resolve(path)) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA) + ).returnResponse() + if (response.getStatusLine().statusCode == 202) { + def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) + } else if (response.getStatusLine().statusCode == 503) { + waitForLock() + } else { + throw new DrafterException("Problem deleting draftset ${errorMsg(response)}") + } + retries = retries - 1 } + throw new DrafterException("Problem deleting draftset, maximum retries reached while waiting for lock.") } def waitForJob(URI finishedJob, String restartId) { @@ -154,18 +197,26 @@ String encGraph = URLEncoder.encode(graph, "UTF-8") path = path + "?graph=${encGraph}" } - HttpResponse response = getExec().execute( - Request.Put(apiBase.resolve(path)) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA) - .bodyStream(new FilePath(new File(fileName)).read(), ContentType.create(mimeType, encoding)) - ).returnResponse() - if (response.getStatusLine().statusCode == 202) { - def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) - waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) - } else { - throw new DrafterException("Problem adding data ${errorMsg(response)}") + int retries = 5 + while (retries > 0) { + HttpResponse response = getExec().execute( + Request.Put(apiBase.resolve(path)) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA) + .bodyStream(new FilePath(new File(fileName)).read(), ContentType.create(mimeType, encoding)) + ).returnResponse() + if (response.getStatusLine().statusCode == 202) { + def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) + return + } else if (response.getStatusLine().statusCode == 503) { + waitForLock() + } else { + throw new DrafterException("Problem adding data ${errorMsg(response)}") + } + retries = retries - 1 } + throw new DrafterException("Problem adding data, maximum retries reached while waiting for lock.") } def findDraftset(String displayName) { @@ -182,34 +233,40 @@ def publishDraftset(String id) { String path = "/v1/draftset/${id}/publish" Executor exec = getExec() - HttpResponse response = exec.execute( - Request.Post(apiBase.resolve(path)) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA) - ).returnResponse() - if (response.getStatusLine().statusCode == 202) { - def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) - waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) - if (pmd.config.empty_cache) { - exec.execute( - Request.Put(pmd.config.empty_cache) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA)) + int retries = 5 + while (retries > 0) { + HttpResponse response = exec.execute( + Request.Post(apiBase.resolve(path)) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA) + ).returnResponse() + if (response.getStatusLine().statusCode == 202) { + def jobObj = new JsonSlurper().parse(EntityUtils.toByteArray(response.getEntity())) + waitForJob(apiBase.resolve(jobObj['finished-job'] as String), jobObj['restart-id'] as String) + if (pmd.config.empty_cache) { + exec.execute( + Request.Put(pmd.config.empty_cache) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA)) + } + if (pmd.config.sync_search) { + exec.execute( + Request.Put(pmd.config.sync_search) + .addHeader("Accept", "application/json") + .userAgent(PMDConfig.UA)) + } + return + } else if (response.getStatusLine().statusCode == 503) { + waitForLock() + } else { + throw new DrafterException("Problem publishing draftset ${errorMsg(response)}") } - if (pmd.config.sync_search) { - exec.execute( - Request.Put(pmd.config.sync_search) - .addHeader("Accept", "application/json") - .userAgent(PMDConfig.UA)) - } - - } else { - throw new DrafterException("Problem publishing draftset ${errorMsg(response)}") + retries = retries - 1 } - - + throw new DrafterException("Problem publishing draftset, maximum retries reached while waiting for lock.") } } +@InheritConstructors class DrafterException extends Exception { } diff --git a/src/uk/org/floop/jenkins_pmd/Pipelines.groovy b/src/uk/org/floop/jenkins_pmd/Pipelines.groovy index 52ff03b..5062e17 100644 --- a/src/uk/org/floop/jenkins_pmd/Pipelines.groovy +++ b/src/uk/org/floop/jenkins_pmd/Pipelines.groovy @@ -2,6 +2,7 @@ import groovy.json.JsonOutput import groovy.json.JsonSlurper +import groovy.transform.InheritConstructors import hudson.FilePath import org.apache.http.HttpEntity import org.apache.http.HttpHost @@ -80,7 +81,7 @@ .addHeader('Accept', 'text/csv') .execute().returnContent().asStream() } else { - mappingStream = new FilePath(mapping).read() + mappingStream = new FilePath(new File(mapping)).read() } body.addBinaryBody( 'columns-csv', @@ -105,4 +106,5 @@ } } +@InheritConstructors class PipelineException extends Exception { } \ No newline at end of file diff --git a/test/integration/groovy/uk/org/floop/jenkins_pmd/DrafterTests.groovy b/test/integration/groovy/uk/org/floop/jenkins_pmd/DrafterTests.groovy index d2a3e21..eceaf2e 100644 --- a/test/integration/groovy/uk/org/floop/jenkins_pmd/DrafterTests.groovy +++ b/test/integration/groovy/uk/org/floop/jenkins_pmd/DrafterTests.groovy @@ -281,4 +281,131 @@ rule.assertLogContains('no job draft to delete', firstResult) } + @Test + void "publish blocks writes"() { + instanceRule.stubFor(post("/v1/draftsets?display-name=project") + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse().withStatus(503))) + instanceRule.stubFor(post("/v1/draftsets?display-name=project") + .inScenario("Write lock") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(seeOther("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0"))) + instanceRule.stubFor(get(urlMatching("/v1/draftsets.*")) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(ok() + .withBodyFile("listDraftsets.json") + .withHeader("Content-Type", "application/json"))) + instanceRule.stubFor(get("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(ok() + .withBodyFile("newDraftset.json") + .withHeader("Content-Type", "application/json"))) + instanceRule.stubFor(delete("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0") + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse().withStatus(503))) + instanceRule.stubFor(delete("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0") + .inScenario("Write lock") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse() + .withStatus(202) + .withBodyFile("deleteJob.json") + .withHeader("Content-Type", "application/json"))) + instanceRule.stubFor(delete(urlMatching("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0/graph.*")) + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse().withStatus(503))) + instanceRule.stubFor(delete(urlMatching("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0/graph.*")) + .inScenario("Write lock") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse() + .withStatus(200) + .withBodyFile("deleteGraph.json") + .withHeader("Content-Type", "application/json"))) + instanceRule.stubFor(put("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0/data") + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse().withStatus(503))) + instanceRule.stubFor(put("/v1/draftset/4e376c57-6816-404a-8945-94849299f2a0/data") + .inScenario("Write lock") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse() + .withStatus(202) + .withBodyFile("addDataJob.json") + .withHeader("Content-Type", "application/json"))) + instanceRule.stubFor(get("/v1/status/finished-jobs/2c4111e5-a299-4526-8327-bad5996de400").inScenario("Delete draftset") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(aResponse().withStatus(404).withBodyFile('notFinishedJob.json')) + .willSetStateTo("Finished")) + instanceRule.stubFor(get("/v1/status/finished-jobs/2c4111e5-a299-4526-8327-bad5996de400").inScenario("Delete draftset") + .whenScenarioStateIs("Finished") + .withHeader("Accept", equalTo("application/json")) + .withBasicAuth("admin", "admin") + .willReturn(ok() + .withBodyFile("finishedJobOk.json"))) + instanceRule.stubFor(get("/columns.csv").willReturn(ok().withBodyFile('columns.csv'))) + instanceRule.stubFor(post("/v1/pipelines/ons-table2qb.core/data-cube/import") + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader('Accept', equalTo('application/json')) + .withBasicAuth('admin', 'admin') + .willReturn(aResponse().withStatus(503))) + instanceRule.stubFor(post("/v1/pipelines/ons-table2qb.core/data-cube/import") + .inScenario("Write lock") + .withHeader('Accept', equalTo('application/json')) + .withBasicAuth('admin', 'admin') + .willReturn(aResponse().withStatus(202).withBodyFile('cubeImportJob.json'))) + instanceRule.stubFor(get('/status/finished-jobs/4fc9ad42-f964-4f56-a1ab-a00bd622b84c') + .withHeader('Accept', equalTo('application/json')) + .withBasicAuth('admin', 'admin') + .willReturn(ok().withBodyFile('finishedImportJobOk.json'))) + instanceRule.stubFor(get('/status/writes-locked') + .inScenario("Write lock") + .whenScenarioStateIs(Scenario.STARTED) + .withHeader('Accept', equalTo('application/json')) + .withBasicAuth('admin', 'admin') + .willReturn(ok().withBody('false')) + .willSetStateTo('Still Publishing')) + instanceRule.stubFor(get('/status/writes-locked') + .inScenario("Write lock") + .whenScenarioStateIs('Still publishing') + .withHeader('Accept', equalTo('application/json')) + .withBasicAuth('admin', 'admin') + .willReturn(ok().withBody('true')) + .willSetStateTo('Published')) + final CpsFlowDefinition flow = new CpsFlowDefinition(""" + node { + dir('out') { + writeFile file:'dataset.trig', text:'dummy:data' + writeFile file:'observations.csv', text:'Dummy,CSV' + } + jobDraft.replace() + uploadTidy(['out/observations.csv'], + '${wireMockRule.baseUrl()}/columns.csv') + }""".stripIndent(), true) + final WorkflowJob workflowJob = rule.createProject(WorkflowJob, 'project') + workflowJob.definition = flow + + final WorkflowRun firstResult = rule.buildAndAssertSuccess(workflowJob) + instanceRule.verify(postRequestedFor(urlEqualTo('/v1/pipelines/ons-table2qb.core/data-cube/import')) + .withHeader('Accept', equalTo('application/json'))) + } + }