diff --git a/pom.xml b/pom.xml
index a4a0c5b..fb8e514 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
4.0.0
io.parallec
parallec-core
- 0.9.4-beta
+ 0.10.6
jar
io.parallec:parallec-core
https://github.com/eBay/parallec
@@ -56,7 +56,7 @@
1.10
2.3
- 1.6.5
+ 1.9.40
2.3.3
2.3.3
diff --git a/src/main/java/io/parallec/core/ParallelClient.java b/src/main/java/io/parallec/core/ParallelClient.java
index 7fc75d8..921ce2e 100644
--- a/src/main/java/io/parallec/core/ParallelClient.java
+++ b/src/main/java/io/parallec/core/ParallelClient.java
@@ -12,20 +12,18 @@
*/
package io.parallec.core;
+import com.ning.http.client.AsyncHttpClient;
import io.parallec.core.actor.ActorConfig;
import io.parallec.core.monitor.MonitorProvider;
import io.parallec.core.resources.HttpClientStore;
import io.parallec.core.resources.HttpClientType;
import io.parallec.core.resources.HttpMethod;
-import io.parallec.core.resources.TcpSshPingResourceStore;
+import io.parallec.core.resources.TcpUdpSshPingResourceStore;
import io.parallec.core.task.ParallelTaskManager;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.http.client.AsyncHttpClient;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -95,7 +93,7 @@ public class ParallelClient {
public HttpClientStore httpClientStore = HttpClientStore.getInstance();
/** The tcp client store. */
- public TcpSshPingResourceStore tcpSshPingResourceStore = TcpSshPingResourceStore.getInstance();
+ public TcpUdpSshPingResourceStore tcpSshPingResourceStore = TcpUdpSshPingResourceStore.getInstance();
/** The is closed is marked when all resources are released/not initialized. */
public static AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -116,7 +114,6 @@ public void initialize() {
ActorConfig.createAndGetActorSystem();
httpClientStore.init();
tcpSshPingResourceStore.init();
- ParallelTaskManager.getInstance();
isClosed.set(false);
logger.info("Parallel Client Resources has been initialized.");
} else {
@@ -141,8 +138,8 @@ public void releaseExternalResources() {
ActorConfig.shutDownActorSystemForce();
httpClientStore.shutdown();
tcpSshPingResourceStore.shutdown();
- taskManager.cleanWaitTaskQueue();
- taskManager.cleanInprogressJobMap();
+ cleanWaitTaskQueue();
+ cleanInprogressJobMap();
isClosed.set(true);
logger.info("Have released all ParallelClient resources "
+ "(actor system + async+sync http client + task queue)"
@@ -170,7 +167,7 @@ public void reinitIfClosed() {
} catch (InterruptedException e) {
logger.error("error reinit httpClientStore", e);
}
- isClosed.set(true);
+ isClosed.set(false);
logger.info("Parallel Client Resources has been reinitialized.");
} else {
logger.debug("NO OP. Resource was not released.");
@@ -215,6 +212,21 @@ public ParallelTaskBuilder prepareTcp(String command) {
cb.getTcpMeta().setCommand(command);
return cb;
}
+
+ /**
+ * Prepare a parallel UDP Task.
+ *
+ * @param command
+ * the command
+ * @return the parallel task builder
+ */
+ public ParallelTaskBuilder prepareUdp(String command) {
+ reinitIfClosed();
+ ParallelTaskBuilder cb = new ParallelTaskBuilder();
+ cb.setProtocol(RequestProtocol.UDP);
+ cb.getUdpMeta().setCommand(command);
+ return cb;
+ }
/**
* Prepare a parallel HTTP GET Task.
@@ -310,7 +322,52 @@ public ParallelTaskBuilder prepareHttpOptions(String url) {
return cb;
}
+ /**
+ * Prepare a parallel HTTP Trace Task.
+ *
+ * @param url
+ * the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
+ * @return the parallel task builder
+ */
+ public ParallelTaskBuilder prepareHttpTrace(String url) {
+ reinitIfClosed();
+ ParallelTaskBuilder cb = new ParallelTaskBuilder();
+ cb.getHttpMeta().setHttpMethod(HttpMethod.TRACE);
+ cb.getHttpMeta().setRequestUrlPostfix(url);
+ return cb;
+
+ }
+
+ /**
+ * Prepare a parallel HTTP Connect Task.
+ *
+ * @param url
+ * the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
+ * @return the parallel task builder
+ */
+ public ParallelTaskBuilder prepareHttpConnect(String url) {
+ reinitIfClosed();
+ ParallelTaskBuilder cb = new ParallelTaskBuilder();
+ cb.getHttpMeta().setHttpMethod(HttpMethod.CONNECT);
+ cb.getHttpMeta().setRequestUrlPostfix(url);
+ return cb;
+
+ }
+ /**
+ * Prepare a parallel HTTP PATCH Task.
+ *
+ * @param url
+ * the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
+ * @return the parallel task builder
+ */
+ public ParallelTaskBuilder prepareHttpPatch(String url) {
+ reinitIfClosed();
+ ParallelTaskBuilder cb = new ParallelTaskBuilder();
+ cb.getHttpMeta().setHttpMethod(HttpMethod.PATCH);
+ cb.getHttpMeta().setRequestUrlPostfix(url);
+ return cb;
+ }
/**
* Sets the custom fast client in the httpClientStore.
*
diff --git a/src/main/java/io/parallec/core/actor/HttpWorker.java b/src/main/java/io/parallec/core/actor/HttpWorker.java
index 6560ca9..2105e06 100644
--- a/src/main/java/io/parallec/core/actor/HttpWorker.java
+++ b/src/main/java/io/parallec/core/actor/HttpWorker.java
@@ -14,6 +14,8 @@
import io.parallec.core.actor.message.ResponseOnSingeRequest;
import io.parallec.core.actor.message.type.RequestWorkerMsgType;
+import io.parallec.core.bean.ResponseHeaderMeta;
+import io.parallec.core.config.ParallecGlobalConfig;
import io.parallec.core.exception.ActorMessageTypeInvalidException;
import io.parallec.core.exception.HttpRequestCreateException;
import io.parallec.core.resources.HttpMethod;
@@ -25,6 +27,9 @@
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -44,9 +49,9 @@
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Response;
+import com.ning.http.util.AsyncHttpProviderUtils;
-
-
+// TODO: Auto-generated Javadoc
/**
* This is an akka actor with async http client.
*
@@ -73,9 +78,14 @@ public class HttpWorker extends UntypedActor {
private static Logger logger = LoggerFactory.getLogger(HttpWorker.class);
/** The http header map. */
- // 20140310
private final Map httpHeaderMap = new HashMap();
+ /**
+ * The response header meta: which keys are needed to get from response
+ * header.
+ */
+ private final ResponseHeaderMeta responseHeaderMeta;
+
/** The sender. */
private ActorRef sender = null;
@@ -98,7 +108,7 @@ public class HttpWorker extends UntypedActor {
/** The response future. */
ListenableFuture responseFuture = null;
-
+
/**
* Instantiates a new http worker.
*
@@ -108,11 +118,13 @@ public class HttpWorker extends UntypedActor {
* @param httpMethod the http method
* @param postData the post data
* @param httpHeaderMap the http header map
+ * @param responseHeaderMeta the response header meta
*/
public HttpWorker(final int actorMaxOperationTimeoutSec,
final AsyncHttpClient client, final String requestUrl,
final HttpMethod httpMethod, final String postData,
- final Map httpHeaderMap
+ final Map httpHeaderMap,
+ final ResponseHeaderMeta responseHeaderMeta
) {
this.actorMaxOperationTimeoutSec = actorMaxOperationTimeoutSec;
@@ -122,6 +134,7 @@ public HttpWorker(final int actorMaxOperationTimeoutSec,
this.postData = postData;
if (httpHeaderMap != null)
this.httpHeaderMap.putAll(httpHeaderMap);
+ this.responseHeaderMeta = responseHeaderMeta;
}
@@ -159,6 +172,17 @@ public BoundRequestBuilder createRequest()
case DELETE:
builder = client.prepareDelete(requestUrl);
break;
+ case TRACE:
+ builder = client.prepareTrace(requestUrl);
+ break;
+ case CONNECT:
+ builder = client.prepareConnect(requestUrl);
+ break;
+ case PATCH:
+ builder = client.preparePatch(requestUrl);
+ break;
+
+
default:
break;
}
@@ -233,7 +257,7 @@ public void onReceive(Object message) throws Exception {
sender = getSender();
reply(null, true, PcConstants.REQUEST_CANCELED,
PcConstants.REQUEST_CANCELED, PcConstants.NA,
- PcConstants.NA_INT);
+ PcConstants.NA_INT, null);
break;
case PROCESS_ON_EXCEPTION:
@@ -243,7 +267,7 @@ public void onReceive(Object message) throws Exception {
String stackTrace = PcStringUtils.printStackTrace(cause);
cancelCancellable();
reply(null, true, errorSummary, stackTrace, PcConstants.NA,
- PcConstants.NA_INT);
+ PcConstants.NA_INT, null);
break;
@@ -258,7 +282,7 @@ public void onReceive(Object message) throws Exception {
actorMaxOperationTimeoutSec);
reply(null, true, errorMsg, errorMsg, PcConstants.NA,
- PcConstants.NA_INT);
+ PcConstants.NA_INT, null);
break;
case CHECK_FUTURE_STATE:
@@ -317,43 +341,69 @@ public void cancelCancellable() {
* the status code
* @param statusCodeInt
* the status code int
+ * @param responseHeaders
+ * the response headers
*/
private void reply(final String response, final boolean error,
final String errorMessage, final String stackTrace,
- final String statusCode, final int statusCodeInt) {
+ final String statusCode, final int statusCodeInt,
+ Map> responseHeaders) {
if (!sentReply) {
- //must update sentReply first to avoid duplicated msg.
+ // must update sentReply first to avoid duplicated msg.
sentReply = true;
+
final ResponseOnSingeRequest res = new ResponseOnSingeRequest(
response, error, errorMessage, stackTrace, statusCode,
- statusCodeInt, PcDateUtils.getNowDateTimeStrStandard());
+ statusCodeInt, PcDateUtils.getNowDateTimeStrStandard(),
+ responseHeaders);
if (!getContext().system().deadLetters().equals(sender)) {
sender.tell(res, getSelf());
}
-
- getContext().stop(getSelf());
+ if (getContext() != null) {
+ getContext().stop(getSelf());
+ }
}
}
/**
* On complete.
+ * Save response headers when needed.
*
* @param response
* the response
- * @return the response on singe request
+ * @return the response on single request
*/
public ResponseOnSingeRequest onComplete(Response response) {
cancelCancellable();
try {
+ Map> responseHeaders = null;
+ if (responseHeaderMeta != null) {
+ responseHeaders = new LinkedHashMap>();
+ if (responseHeaderMeta.isGetAll()) {
+ for (Map.Entry> header : response
+ .getHeaders()) {
+ responseHeaders.put(header.getKey().toLowerCase(Locale.ROOT), header.getValue());
+ }
+ } else {
+ for (String key : responseHeaderMeta.getKeys()) {
+ if (response.getHeaders().containsKey(key)) {
+ responseHeaders.put(key.toLowerCase(Locale.ROOT),
+ response.getHeaders().get(key));
+ }
+ }
+ }
+ }
int statusCodeInt = response.getStatusCode();
String statusCode = statusCodeInt + " " + response.getStatusText();
-
- reply(response.getResponseBody(), false, null, null, statusCode,
- statusCodeInt);
+ String charset = ParallecGlobalConfig.httpResponseBodyCharsetUsesResponseContentType ?
+ AsyncHttpProviderUtils.parseCharset(response.getContentType())
+ : ParallecGlobalConfig.httpResponseBodyDefaultCharset;
+ reply(response.getResponseBody(charset), false, null, null, statusCode,
+ statusCodeInt, responseHeaders);
} catch (IOException e) {
getLogger().error("fail response.getResponseBody " + e);
}
diff --git a/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java b/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java
index 34a03bd..498cdc1 100644
--- a/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java
+++ b/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java
@@ -67,26 +67,16 @@ public AsyncHttpClientFactoryEmbed() {
// create and configure async http client
AsyncHttpClientConfigBean configFastClient = new AsyncHttpClientConfigBean();
- logger.info(
- "FastClient: ningFastClientConnectionTimeoutMillis: {}",
- ParallecGlobalConfig.ningFastClientConnectionTimeoutMillis);
- configFastClient
- .setConnectionTimeOutInMs(ParallecGlobalConfig.ningFastClientConnectionTimeoutMillis);
-
- logger.info("FastClient: ningFastClientRequestTimeoutMillis: {}",
- ParallecGlobalConfig.ningFastClientRequestTimeoutMillis);
- configFastClient
- .setRequestTimeoutInMs(ParallecGlobalConfig.ningFastClientRequestTimeoutMillis);
- fastClient = new AsyncHttpClient(configFastClient);
-
+ configFastClient.setConnectionTimeOut(ParallecGlobalConfig.ningFastClientConnectionTimeoutMillis);
+ configFastClient.setRequestTimeout(ParallecGlobalConfig.ningFastClientRequestTimeoutMillis);
+ fastClient = new AsyncHttpClient(configFastClient);
+
// TODO added
// configFastClient.setMaxRequestRetry(3);
AsyncHttpClientConfigBean configSlowClient = new AsyncHttpClientConfigBean();
- configSlowClient
- .setConnectionTimeOutInMs(ParallecGlobalConfig.ningSlowClientConnectionTimeoutMillis);
- configSlowClient
- .setRequestTimeoutInMs(ParallecGlobalConfig.ningSlowClientRequestTimeoutMillis);
+ configSlowClient.setConnectionTimeOut(ParallecGlobalConfig.ningSlowClientConnectionTimeoutMillis);
+ configSlowClient.setRequestTimeout(ParallecGlobalConfig.ningSlowClientRequestTimeoutMillis);
slowClient = new AsyncHttpClient(configSlowClient);
disableCertificateVerification();
@@ -146,17 +136,11 @@ public boolean verify(final String hostname,
HttpsURLConnection.setDefaultHostnameVerifier(verifier);
}
- /**
- * dummy.
- */
- public void state() {
- logger.info("Initializing Default AHC CLient Factory...");
- }
/**
* class CustomTrustManager.
*/
- private static class CustomTrustManager implements X509TrustManager {
+ public static class CustomTrustManager implements X509TrustManager {
/**
* Gets the accepted issuers.
@@ -193,4 +177,4 @@ public void checkServerTrusted(final X509Certificate[] certs,
/* no op */
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/io/parallec/core/resources/HttpMethod.java b/src/main/java/io/parallec/core/resources/HttpMethod.java
index 8c1561a..449cfdd 100644
--- a/src/main/java/io/parallec/core/resources/HttpMethod.java
+++ b/src/main/java/io/parallec/core/resources/HttpMethod.java
@@ -30,6 +30,12 @@ public enum HttpMethod {
DELETE,
/** The options. */
OPTIONS,
+ /** The TRACE. */
+ TRACE,
+ /** The CONNECT. */
+ CONNECT,
+ /** The PATCH. */
+ PATCH,
/** The na. */
NA
}
diff --git a/src/test/java/io/parallec/core/main/http/ParallelClientHttpConnectTest.java b/src/test/java/io/parallec/core/main/http/ParallelClientHttpConnectTest.java
new file mode 100644
index 0000000..a378c4a
--- /dev/null
+++ b/src/test/java/io/parallec/core/main/http/ParallelClientHttpConnectTest.java
@@ -0,0 +1,52 @@
+package io.parallec.core.main.http;
+
+import java.util.Map;
+
+import io.parallec.core.ParallecResponseHandler;
+import io.parallec.core.ParallelClient;
+import io.parallec.core.ParallelTask;
+import io.parallec.core.TestBase;
+import io.parallec.core.ResponseOnSingleTask;
+import io.parallec.core.util.PcStringUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelClientHttpConnectTest extends TestBase{
+ private static ParallelClient pc;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pc = new ParallelClient();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ pc.releaseExternalResources();
+ }
+ @Test
+ public void hitConnectTest() {
+
+ ParallelTask pt = pc
+ .prepareHttpConnect("")
+ .setConcurrency(1000)
+ .setSaveResponseToTask(true)
+ .setTargetHostsFromLineByLineText(FILEPATH_TOP_100,
+ SOURCE_LOCAL).execute(new ParallecResponseHandler() {
+
+ @Override
+ public void onCompleted(ResponseOnSingleTask res,
+ Map responseContext) {
+ logger.info("Responose Code:" + res.getStatusCode()
+ + " host: " + res.getHost()+ " Reponse Content: " +res.getResponseContent());
+ }
+ });
+ logger.info("Result Summary\n{}",
+ PcStringUtils.renderJson(pt.getAggregateResultFullSummary()));
+
+ pt.saveLogToLocal();
+ }
+
+
+}
diff --git a/src/test/java/io/parallec/core/main/http/ParallelClientHttpPatchTest.java b/src/test/java/io/parallec/core/main/http/ParallelClientHttpPatchTest.java
new file mode 100644
index 0000000..6760d1d
--- /dev/null
+++ b/src/test/java/io/parallec/core/main/http/ParallelClientHttpPatchTest.java
@@ -0,0 +1,52 @@
+package io.parallec.core.main.http;
+
+import java.util.Map;
+
+import io.parallec.core.ParallecResponseHandler;
+import io.parallec.core.ParallelClient;
+import io.parallec.core.ParallelTask;
+import io.parallec.core.TestBase;
+import io.parallec.core.ResponseOnSingleTask;
+import io.parallec.core.util.PcStringUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelClientHttpPatchTest extends TestBase{
+ private static ParallelClient pc;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pc = new ParallelClient();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ pc.releaseExternalResources();
+ }
+ @Test
+ public void hitPatchTest() {
+
+ ParallelTask pt = pc
+ .prepareHttpPatch("")
+ .setConcurrency(1000)
+ .setSaveResponseToTask(true)
+ .setTargetHostsFromLineByLineText(FILEPATH_TOP_100,
+ SOURCE_LOCAL).execute(new ParallecResponseHandler() {
+
+ @Override
+ public void onCompleted(ResponseOnSingleTask res,
+ Map responseContext) {
+ logger.info("Responose Code:" + res.getStatusCode()
+ + " host: " + res.getHost()+ " Reponse Content: " +res.getResponseContent());
+ }
+ });
+ logger.info("Result Summary\n{}",
+ PcStringUtils.renderJson(pt.getAggregateResultFullSummary()));
+
+ pt.saveLogToLocal();
+ }
+
+
+}
diff --git a/src/test/java/io/parallec/core/main/http/ParallelClientHttpTraceTest.java b/src/test/java/io/parallec/core/main/http/ParallelClientHttpTraceTest.java
new file mode 100644
index 0000000..f47970e
--- /dev/null
+++ b/src/test/java/io/parallec/core/main/http/ParallelClientHttpTraceTest.java
@@ -0,0 +1,52 @@
+package io.parallec.core.main.http;
+
+import java.util.Map;
+
+import io.parallec.core.ParallecResponseHandler;
+import io.parallec.core.ParallelClient;
+import io.parallec.core.ParallelTask;
+import io.parallec.core.TestBase;
+import io.parallec.core.ResponseOnSingleTask;
+import io.parallec.core.util.PcStringUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelClientHttpTraceTest extends TestBase{
+ private static ParallelClient pc;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pc = new ParallelClient();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ pc.releaseExternalResources();
+ }
+ @Test
+ public void hitTraceTest() {
+
+ ParallelTask pt = pc
+ .prepareHttpTrace("")
+ .setConcurrency(1000)
+ .setSaveResponseToTask(true)
+ .setTargetHostsFromLineByLineText(FILEPATH_TOP_100,
+ SOURCE_LOCAL).execute(new ParallecResponseHandler() {
+
+ @Override
+ public void onCompleted(ResponseOnSingleTask res,
+ Map responseContext) {
+ logger.info("Responose Code:" + res.getStatusCode()
+ + " host: " + res.getHost()+ " Reponse Content: " +res.getResponseContent());
+ }
+ });
+ logger.info("Result Summary\n{}",
+ PcStringUtils.renderJson(pt.getAggregateResultFullSummary()));
+
+ pt.saveLogToLocal();
+ }
+
+
+}