Merge pull request #306 from actions/with-retries

Add retries to all API calls
This commit is contained in:
David Hadka 2020-05-11 16:56:45 -05:00 committed by GitHub
commit bac1a40c81
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 350 additions and 74 deletions

View file

@ -1,4 +1,4 @@
import { getCacheVersion } from "../src/cacheHttpClient"; import { getCacheVersion, retry } from "../src/cacheHttpClient";
import { CompressionMethod, Inputs } from "../src/constants"; import { CompressionMethod, Inputs } from "../src/constants";
import * as testUtils from "../src/utils/testUtils"; import * as testUtils from "../src/utils/testUtils";
@ -37,3 +37,141 @@ test("getCacheVersion with gzip compression does not change vesion", async () =>
test("getCacheVersion with no input throws", async () => { test("getCacheVersion with no input throws", async () => {
expect(() => getCacheVersion()).toThrow(); expect(() => getCacheVersion()).toThrow();
}); });
interface TestResponse {
statusCode: number;
result: string | null;
}
function handleResponse(
response: TestResponse | undefined
): Promise<TestResponse> {
if (!response) {
fail("Retry method called too many times");
}
if (response.statusCode === 999) {
throw Error("Test Error");
} else {
return Promise.resolve(response);
}
}
async function testRetryExpectingResult(
responses: Array<TestResponse>,
expectedResult: string | null
): Promise<void> {
responses = responses.reverse(); // Reverse responses since we pop from end
const actualResult = await retry(
"test",
() => handleResponse(responses.pop()),
(response: TestResponse) => response.statusCode
);
expect(actualResult.result).toEqual(expectedResult);
}
async function testRetryExpectingError(
responses: Array<TestResponse>
): Promise<void> {
responses = responses.reverse(); // Reverse responses since we pop from end
expect(
retry(
"test",
() => handleResponse(responses.pop()),
(response: TestResponse) => response.statusCode
)
).rejects.toBeInstanceOf(Error);
}
test("retry works on successful response", async () => {
await testRetryExpectingResult(
[
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});
test("retry works after retryable status code", async () => {
await testRetryExpectingResult(
[
{
statusCode: 503,
result: null
},
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});
test("retry fails after exhausting retries", async () => {
await testRetryExpectingError([
{
statusCode: 503,
result: null
},
{
statusCode: 503,
result: null
},
{
statusCode: 200,
result: "Ok"
}
]);
});
test("retry fails after non-retryable status code", async () => {
await testRetryExpectingError([
{
statusCode: 500,
result: null
},
{
statusCode: 200,
result: "Ok"
}
]);
});
test("retry works after error", async () => {
await testRetryExpectingResult(
[
{
statusCode: 999,
result: null
},
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});
test("retry returns after client error", async () => {
await testRetryExpectingResult(
[
{
statusCode: 400,
result: null
},
{
statusCode: 200,
result: "Ok"
}
],
null
);
});

74
dist/restore/index.js vendored
View file

@ -2197,6 +2197,12 @@ function isSuccessStatusCode(statusCode) {
} }
return statusCode >= 200 && statusCode < 300; return statusCode >= 200 && statusCode < 300;
} }
function isServerErrorStatusCode(statusCode) {
if (!statusCode) {
return true;
}
return statusCode >= 500;
}
function isRetryableStatusCode(statusCode) { function isRetryableStatusCode(statusCode) {
if (!statusCode) { if (!statusCode) {
return false; return false;
@ -2246,13 +2252,57 @@ function getCacheVersion(compressionMethod) {
.digest("hex"); .digest("hex");
} }
exports.getCacheVersion = getCacheVersion; exports.getCacheVersion = getCacheVersion;
function retry(name, method, getStatusCode, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
let response = undefined;
let statusCode = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = yield method();
statusCode = getStatusCode(response);
if (!isServerErrorStatusCode(statusCode)) {
return response;
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
}
catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
});
}
exports.retry = retry;
function retryTypedResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.statusCode, maxAttempts);
});
}
exports.retryTypedResponse = retryTypedResponse;
function retryHttpClientResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.message.statusCode, maxAttempts);
});
}
exports.retryHttpClientResponse = retryHttpClientResponse;
function getCacheEntry(keys, options) { function getCacheEntry(keys, options) {
var _a, _b; var _a, _b;
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const httpClient = createHttpClient(); const httpClient = createHttpClient();
const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod); const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod);
const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`; const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`;
const response = yield httpClient.getJson(getCacheApiUrl(resource)); const response = yield retryTypedResponse("getCacheEntry", () => httpClient.getJson(getCacheApiUrl(resource)));
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
} }
@ -2281,7 +2331,7 @@ function downloadCache(archiveLocation, archivePath) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const stream = fs.createWriteStream(archivePath); const stream = fs.createWriteStream(archivePath);
const httpClient = new http_client_1.HttpClient("actions/cache"); const httpClient = new http_client_1.HttpClient("actions/cache");
const downloadResponse = yield httpClient.get(archiveLocation); const downloadResponse = yield retryHttpClientResponse("downloadCache", () => httpClient.get(archiveLocation));
// Abort download if no traffic received over the socket. // Abort download if no traffic received over the socket.
downloadResponse.message.socket.setTimeout(constants_1.SocketTimeout, () => { downloadResponse.message.socket.setTimeout(constants_1.SocketTimeout, () => {
downloadResponse.message.destroy(); downloadResponse.message.destroy();
@ -2313,7 +2363,7 @@ function reserveCache(key, options) {
key, key,
version version
}; };
const response = yield httpClient.postJson(getCacheApiUrl("caches"), reserveCacheRequest); const response = yield retryTypedResponse("reserveCache", () => httpClient.postJson(getCacheApiUrl("caches"), reserveCacheRequest));
return _d = (_c = (_b = response) === null || _b === void 0 ? void 0 : _b.result) === null || _c === void 0 ? void 0 : _c.cacheId, (_d !== null && _d !== void 0 ? _d : -1); return _d = (_c = (_b = response) === null || _b === void 0 ? void 0 : _b.result) === null || _c === void 0 ? void 0 : _c.cacheId, (_d !== null && _d !== void 0 ? _d : -1);
}); });
} }
@ -2335,21 +2385,7 @@ function uploadChunk(httpClient, resourceUrl, openStream, start, end) {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"Content-Range": getContentRange(start, end) "Content-Range": getContentRange(start, end)
}; };
const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () { yield retryHttpClientResponse(`uploadChunk (start: ${start}, end: ${end})`, () => httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders));
return yield httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders);
});
const response = yield uploadChunkRequest();
if (isSuccessStatusCode(response.message.statusCode)) {
return;
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`);
const retryResponse = yield uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(`Cache service responded with ${response.message.statusCode} during chunk upload.`);
}); });
} }
function parseEnvNumber(key) { function parseEnvNumber(key) {
@ -2401,7 +2437,7 @@ function uploadFile(httpClient, cacheId, archivePath) {
function commitCache(httpClient, cacheId, filesize) { function commitCache(httpClient, cacheId, filesize) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const commitCacheRequest = { size: filesize }; const commitCacheRequest = { size: filesize };
return yield httpClient.postJson(getCacheApiUrl(`caches/${cacheId.toString()}`), commitCacheRequest); return yield retryTypedResponse("commitCache", () => httpClient.postJson(getCacheApiUrl(`caches/${cacheId.toString()}`), commitCacheRequest));
}); });
} }
function saveCache(cacheId, archivePath) { function saveCache(cacheId, archivePath) {

74
dist/save/index.js vendored
View file

@ -2197,6 +2197,12 @@ function isSuccessStatusCode(statusCode) {
} }
return statusCode >= 200 && statusCode < 300; return statusCode >= 200 && statusCode < 300;
} }
function isServerErrorStatusCode(statusCode) {
if (!statusCode) {
return true;
}
return statusCode >= 500;
}
function isRetryableStatusCode(statusCode) { function isRetryableStatusCode(statusCode) {
if (!statusCode) { if (!statusCode) {
return false; return false;
@ -2246,13 +2252,57 @@ function getCacheVersion(compressionMethod) {
.digest("hex"); .digest("hex");
} }
exports.getCacheVersion = getCacheVersion; exports.getCacheVersion = getCacheVersion;
function retry(name, method, getStatusCode, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
let response = undefined;
let statusCode = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = yield method();
statusCode = getStatusCode(response);
if (!isServerErrorStatusCode(statusCode)) {
return response;
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
}
catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
});
}
exports.retry = retry;
function retryTypedResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.statusCode, maxAttempts);
});
}
exports.retryTypedResponse = retryTypedResponse;
function retryHttpClientResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.message.statusCode, maxAttempts);
});
}
exports.retryHttpClientResponse = retryHttpClientResponse;
function getCacheEntry(keys, options) { function getCacheEntry(keys, options) {
var _a, _b; var _a, _b;
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const httpClient = createHttpClient(); const httpClient = createHttpClient();
const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod); const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod);
const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`; const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`;
const response = yield httpClient.getJson(getCacheApiUrl(resource)); const response = yield retryTypedResponse("getCacheEntry", () => httpClient.getJson(getCacheApiUrl(resource)));
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
} }
@ -2281,7 +2331,7 @@ function downloadCache(archiveLocation, archivePath) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const stream = fs.createWriteStream(archivePath); const stream = fs.createWriteStream(archivePath);
const httpClient = new http_client_1.HttpClient("actions/cache"); const httpClient = new http_client_1.HttpClient("actions/cache");
const downloadResponse = yield httpClient.get(archiveLocation); const downloadResponse = yield retryHttpClientResponse("downloadCache", () => httpClient.get(archiveLocation));
// Abort download if no traffic received over the socket. // Abort download if no traffic received over the socket.
downloadResponse.message.socket.setTimeout(constants_1.SocketTimeout, () => { downloadResponse.message.socket.setTimeout(constants_1.SocketTimeout, () => {
downloadResponse.message.destroy(); downloadResponse.message.destroy();
@ -2313,7 +2363,7 @@ function reserveCache(key, options) {
key, key,
version version
}; };
const response = yield httpClient.postJson(getCacheApiUrl("caches"), reserveCacheRequest); const response = yield retryTypedResponse("reserveCache", () => httpClient.postJson(getCacheApiUrl("caches"), reserveCacheRequest));
return _d = (_c = (_b = response) === null || _b === void 0 ? void 0 : _b.result) === null || _c === void 0 ? void 0 : _c.cacheId, (_d !== null && _d !== void 0 ? _d : -1); return _d = (_c = (_b = response) === null || _b === void 0 ? void 0 : _b.result) === null || _c === void 0 ? void 0 : _c.cacheId, (_d !== null && _d !== void 0 ? _d : -1);
}); });
} }
@ -2335,21 +2385,7 @@ function uploadChunk(httpClient, resourceUrl, openStream, start, end) {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"Content-Range": getContentRange(start, end) "Content-Range": getContentRange(start, end)
}; };
const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () { yield retryHttpClientResponse(`uploadChunk (start: ${start}, end: ${end})`, () => httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders));
return yield httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders);
});
const response = yield uploadChunkRequest();
if (isSuccessStatusCode(response.message.statusCode)) {
return;
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`);
const retryResponse = yield uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(`Cache service responded with ${response.message.statusCode} during chunk upload.`);
}); });
} }
function parseEnvNumber(key) { function parseEnvNumber(key) {
@ -2401,7 +2437,7 @@ function uploadFile(httpClient, cacheId, archivePath) {
function commitCache(httpClient, cacheId, filesize) { function commitCache(httpClient, cacheId, filesize) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const commitCacheRequest = { size: filesize }; const commitCacheRequest = { size: filesize };
return yield httpClient.postJson(getCacheApiUrl(`caches/${cacheId.toString()}`), commitCacheRequest); return yield retryTypedResponse("commitCache", () => httpClient.postJson(getCacheApiUrl(`caches/${cacheId.toString()}`), commitCacheRequest));
}); });
} }
function saveCache(cacheId, archivePath) { function saveCache(cacheId, archivePath) {

View file

@ -30,6 +30,13 @@ function isSuccessStatusCode(statusCode?: number): boolean {
return statusCode >= 200 && statusCode < 300; return statusCode >= 200 && statusCode < 300;
} }
function isServerErrorStatusCode(statusCode?: number): boolean {
if (!statusCode) {
return true;
}
return statusCode >= 500;
}
function isRetryableStatusCode(statusCode?: number): boolean { function isRetryableStatusCode(statusCode?: number): boolean {
if (!statusCode) { if (!statusCode) {
return false; return false;
@ -99,6 +106,75 @@ export function getCacheVersion(compressionMethod?: CompressionMethod): string {
.digest("hex"); .digest("hex");
} }
export async function retry<T>(
name: string,
method: () => Promise<T>,
getStatusCode: (T) => number | undefined,
maxAttempts = 2
): Promise<T> {
let response: T | undefined = undefined;
let statusCode: number | undefined = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = await method();
statusCode = getStatusCode(response);
if (!isServerErrorStatusCode(statusCode)) {
return response;
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
} catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(
`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`
);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
}
export async function retryTypedResponse<T>(
name: string,
method: () => Promise<ITypedResponse<T>>,
maxAttempts = 2
): Promise<ITypedResponse<T>> {
return await retry(
name,
method,
(response: ITypedResponse<T>) => response.statusCode,
maxAttempts
);
}
export async function retryHttpClientResponse<T>(
name: string,
method: () => Promise<IHttpClientResponse>,
maxAttempts = 2
): Promise<IHttpClientResponse> {
return await retry(
name,
method,
(response: IHttpClientResponse) => response.message.statusCode,
maxAttempts
);
}
export async function getCacheEntry( export async function getCacheEntry(
keys: string[], keys: string[],
options?: CacheOptions options?: CacheOptions
@ -109,8 +185,8 @@ export async function getCacheEntry(
keys.join(",") keys.join(",")
)}&version=${version}`; )}&version=${version}`;
const response = await httpClient.getJson<ArtifactCacheEntry>( const response = await retryTypedResponse("getCacheEntry", () =>
getCacheApiUrl(resource) httpClient.getJson<ArtifactCacheEntry>(getCacheApiUrl(resource))
); );
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
@ -145,7 +221,10 @@ export async function downloadCache(
): Promise<void> { ): Promise<void> {
const stream = fs.createWriteStream(archivePath); const stream = fs.createWriteStream(archivePath);
const httpClient = new HttpClient("actions/cache"); const httpClient = new HttpClient("actions/cache");
const downloadResponse = await httpClient.get(archiveLocation); const downloadResponse = await retryHttpClientResponse(
"downloadCache",
() => httpClient.get(archiveLocation)
);
// Abort download if no traffic received over the socket. // Abort download if no traffic received over the socket.
downloadResponse.message.socket.setTimeout(SocketTimeout, () => { downloadResponse.message.socket.setTimeout(SocketTimeout, () => {
@ -187,9 +266,11 @@ export async function reserveCache(
key, key,
version version
}; };
const response = await httpClient.postJson<ReserveCacheResponse>( const response = await retryTypedResponse("reserveCache", () =>
getCacheApiUrl("caches"), httpClient.postJson<ReserveCacheResponse>(
reserveCacheRequest getCacheApiUrl("caches"),
reserveCacheRequest
)
); );
return response?.result?.cacheId ?? -1; return response?.result?.cacheId ?? -1;
} }
@ -223,32 +304,15 @@ async function uploadChunk(
"Content-Range": getContentRange(start, end) "Content-Range": getContentRange(start, end)
}; };
const uploadChunkRequest = async (): Promise<IHttpClientResponse> => { await retryHttpClientResponse(
return await httpClient.sendStream( `uploadChunk (start: ${start}, end: ${end})`,
"PATCH", () =>
resourceUrl, httpClient.sendStream(
openStream(), "PATCH",
additionalHeaders resourceUrl,
); openStream(),
}; additionalHeaders
)
const response = await uploadChunkRequest();
if (isSuccessStatusCode(response.message.statusCode)) {
return;
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(
`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`
);
const retryResponse = await uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(
`Cache service responded with ${response.message.statusCode} during chunk upload.`
); );
} }
@ -325,9 +389,11 @@ async function commitCache(
filesize: number filesize: number
): Promise<ITypedResponse<null>> { ): Promise<ITypedResponse<null>> {
const commitCacheRequest: CommitCacheRequest = { size: filesize }; const commitCacheRequest: CommitCacheRequest = { size: filesize };
return await httpClient.postJson<null>( return await retryTypedResponse("commitCache", () =>
getCacheApiUrl(`caches/${cacheId.toString()}`), httpClient.postJson<null>(
commitCacheRequest getCacheApiUrl(`caches/${cacheId.toString()}`),
commitCacheRequest
)
); );
} }