Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy,
public void query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
Consumer<Throwable> onFailure);

/**
* Execute a streaming query against a database.
*
* @param query
* the query to execute.
* @param timeUnit
* the time unit of the results.
* @param chunkSize
* the number of QueryResults to process in one chunk.
* @param onNext
* the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query
* @param onComplete
* the onComplete to invoke for successfully end of stream
* @param onFailure
* the consumer for error handling
*/
public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
Consumer<Throwable> onFailure);

/**
* Execute a query against a database.
*
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,90 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
});
}

/**
* {@inheritDoc}
*/
@Override
public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
final Runnable onComplete, final Consumer<Throwable> onFailure) {
Call<ResponseBody> call;
if (query.hasBoundParameters()) {
if (query.requiresPost()) {
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
query.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
query.getParameterJsonWithUrlEncoded());
}
} else {
if (query.requiresPost()) {
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
} else {
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
}
}

call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {

Cancellable cancellable = new Cancellable() {
@Override
public void cancel() {
call.cancel();
}

@Override
public boolean isCanceled() {
return call.isCanceled();
}
};

try {
if (response.isSuccessful()) {
ResponseBody chunkedBody = response.body();
chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
} else {
// REVIEW: must be handled consistently with IOException.
ResponseBody errorBody = response.errorBody();
if (errorBody != null) {
InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
if (onFailure == null) {
throw influxDBException;
} else {
onFailure.accept(influxDBException);
}
}
}
} catch (IOException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError(e.toString());
onNext.accept(cancellable, queryResult);
//passing null onFailure consumer is here for backward compatibility
//where the empty queryResult containing error is propagating into onNext consumer
if (onFailure != null) {
onFailure.accept(e);
}
} catch (Exception e) {
call.cancel();
if (onFailure != null) {
onFailure.accept(e);
}
}

}

@Override
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
if (onFailure == null) {
throw new InfluxDBException(t);
} else {
onFailure.accept(t);
}
}
});
}

/**
* {@inheritDoc}
*/
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,24 @@ public Call<QueryResult> postQuery(@Query(DB) String db, @Query(EPOCH) String ep
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@POST("query")
@FormUrlEncoded
public Call<QueryResult> postQuery(@Field(value = Q, encoded = true) String query);
Expand All @@ -90,8 +102,18 @@ public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, enco
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
}
69 changes: 64 additions & 5 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand All @@ -37,6 +33,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1158,6 +1155,68 @@ public void testChunkingOnComplete() throws InterruptedException {
Assertions.assertTrue(await, "The onComplete action did not arrive!");
}

/**
* Test chunking with TimeUnit
* @throws InterruptedException
*/
@Test
public void testChunkingWithImeUnit() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
// do not test version 0.13 and 1.0
return;
}

String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
batchPoints.point(point1);
batchPoints.point(point2);
batchPoints.point(point3);
this.influxDB.write(batchPoints);

CountDownLatch countDownLatch = new CountDownLatch(1);

Thread.sleep(2000);
Query query = new Query("SELECT * FROM disk", dbName);
this.influxDB.query(query, 2, result -> {}, countDownLatch::countDown);
List<QueryResult> results = new ArrayList<>();
AtomicReference<Throwable> errorFound = new AtomicReference<>();

// Run and map to points
this.influxDB.query(
query,
TimeUnit.MILLISECONDS,
5000,
(cancellable, queryResult) -> results.add(queryResult),
countDownLatch::countDown,
throwable -> {
countDownLatch.countDown();
errorFound.set(throwable);
}
);

Thread.sleep(2000);
this.influxDB.query(new Query("DROP DATABASE " + dbName));

boolean await = countDownLatch.await(10, TimeUnit.SECONDS);
Assertions.assertTrue(await, "The onComplete action did not arrive!");
Assertions.assertNull(errorFound.get(), "An error occurred : " + errorFound.get());

long totalPoints = results.stream()
.filter(qr -> qr.getResults() != null)
.flatMap(qr -> qr.getResults().stream())
.filter(r -> r.getSeries() != null)
.flatMap(r -> r.getSeries().stream())
.filter(s -> s.getValues() != null)
.mapToLong(s -> s.getValues().size())
.sum();
Assertions.assertEquals(3, totalPoints);
}

@Test
public void testChunkingFailOnComplete() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
Expand Down
Loading