This article shows step by step how to setup AWS Kinesis Stream service to store Split Impressions data in AWS S3 in JSON format
Split Admin site provide a webhook integration option to send the Impression data as they are injected into Split cloud, we will use the webhook to extract Impression data.
We will also use AWS API Gateway as the Webhook service, the API Gateway will forward the post request to AWS Lambda, which will extract the Impression information and send it to AWS Kinesis Stream, which will send it to Firehose service which has a built-in integration with S3.
Here are the steps to setup the integration:
1. First step is to create the Kinesis Firehose service, login to AWS and select Services->Kinesis->Data Stream and click on Create Kinesis stream button.
2. Put the service name, specify the shards number based on the amount of impressions generated, consult AWS expert for more info on shards, and click Create Kinesis stream.
3. Once the Kinesis Stream is created, select its record and click on Connect Kinesis consumers button.
4. Click on Connect to Delivery stream button.
5. Specify the Firehose service name and click Next.
6. In the Select a destination page, make sure S3 is selected, specify S3 bucket for the target storage, and click Next.
7. Select the IAM Role needed to access the service, consult with AWS expert for more info, in this example, we used a role that has AmazonKinesisFullAccess and AmazonKinesisFirehoseFullAccess policy, click Next then click Create delivery stream.
8. The Second service needed is AWS Lambda, select AWS->Lambda, click on Create function button, choose Author from scratch option, use Java 8 for runtime engine.
9. Next step is to create the Java code for Lambda service, we will use Eclipse Maven project with the following dependencies:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.671</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
</dependency>
10. Insert the code below in the class (the class name here is WebhookConsumer.java), make sure to replace the awsAccessKey, awsSecretKey, kinesisStreamName and awsRegion. The code below will convert the JSON structure passed from the POST request to Dictionary objects, then loop through each Impression block and send a bulk post to Kinesis Stream service.
package splitsupport;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.json.JSONObject;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class WebhookConsumer implements RequestHandler<Object, String> {
@Override
public String handleRequest(Object input, Context context) {
String treatment="";
try {
String awsAccessKey="xxxxxxxxxxx";
String awsSecretKey="xxxxxxxxxxxxxxxxxxxxx";
String awsRegion="us-east-2";
String kinesisStreamName="SplitStream";
BasicSessionCredentials awsCredentials = new BasicSessionCredentials(awsAccessKey, awsSecretKey, sessionToken);
ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setRegion(awsRegion);
clientBuilder.setCredentials(new AWSStaticCredentialsProvider(awsCredentials));
clientBuilder.setClientConfiguration(awsClientConfig);
AmazonKinesis kinesisClient = clientBuilder.build();
List<Map<String , String>> impressions = StringToHash(input.toString());
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(kinesisStreamName);
List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
for (Map<String, String> imp1 : impressions) {
JSONObject messageJson = new JSONObject();
messageJson.put("key", imp1.get("key"));
messageJson.put("split", imp1.get("split"));
messageJson.put("environmentName", imp1.get("environmentName"));
messageJson.put("time", imp1.get("time"));
messageJson.put("label", imp1.get("label"));
messageJson.put("treatment", imp1.get("treatment"));
putRecordsRequestEntry.setData(ByteBuffer.wrap(messageJson.toString().getBytes()));
putRecordsRequestEntry.setPartitionKey("partition-2");
putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
System.out.println("Put Result" + putRecordsResult);
treatment=putRecordsResult.toString();
} catch (Exception e) {
System.out.print("Exception: "+e.getMessage());
return "Exception: "+e.getMessage();
}
return treatment;
}
public static List<Map<String , String>> StringToHash(String decomp) {
List<Map<String , String>> arrayRecord = new ArrayList<Map<String,String>>();
if (decomp.contains("}, {")) {
for(final String entry : decomp.split("},")) {
Map<String,String> oneRecord = new HashMap<String, String>();
for(final String parts : entry.split(",")) {
final String[] record = parts.split("=");
String recordOne = record[0].replace("[{", "");
recordOne = recordOne.replace("{", "");
String recordTwo = record[1].replace("}]", "");
oneRecord.put(recordOne.trim(), recordTwo.trim());
}
arrayRecord.add(oneRecord);
}
} else {
Map<String,String> oneRecord = new HashMap<String, String>();
for(final String parts : decomp.split(",")) {
final String[] record = parts.split("=");
String recordOne = record[0].replace("[{", "");
String recordTwo = record[1].replace("}]", "");
oneRecord.put(recordOne.trim(), recordTwo.trim());
}
arrayRecord.add(oneRecord);
}
return arrayRecord;
}
}
11. Export a self-contained JAR file for your project, in Eclipse, right-click on the project name and select Export.
12. Select Runnable JAR File option and click Next, then specify the JAR file name and click Finish.
13. Back to AWS Lambda page, under Function code section, update the Handler box with the content below:
splitsupport.WebhookConsumer::handleRequest
Then upload the new created JAR file.
14. The Third service needed is the API Gateway, select Services->API Gateway.
15. Click on Create API button, and use Rest API and New API options, type a name for your API service and click Create API.
16. Click on your new created API Service, click Actions list to add POST Method Execution, select Lambda Function for Integration type and type the Lambda function name created previously.
17. Since Split zip the post body when calling the Webhook, we need to enable binary type for our API Gateway, under Settings, add */* in Binary Media Types and click Save Changes.
18. The last step is to create a Stage for your API Gateway, and generate the webhook URL under Stages page, click on Create button.
19. Your Webhook service is ready now, login to the Split user interface, go to Admin Settings page, click on Integrations, and click Add button for Outgoing Webhook (Impressions).
20. Select which environment's impressions to export, and paste the API Gateway URL, and click Save.
Comments
0 comments
Please sign in to leave a comment.