Docarizing ElasticSearch to Run in Amazon Container Service

Recently I spent almost two days trying to run Elasticsearch in a docker container and hosted under a Elastic Container Service. A word of caution, running a IO-Heavy app like ES under a container like docker might not be a good idea. Specially if you can’t use Native Networking and have to deal with abstracted bridge mode (Currently you can only run in Bridged mode in Elastic Container Service). However, our purpose is to provide search capability on a small set of data (less than 500K) and we were not concerned about the IO overhead, the maintenance benefit outruns the performance penalty so its a good fit for our requirement. Also, if you want to use these scripts for large volume of data , you will have to map larger disk space on the Host services (which is not done here).

While I am not going to describe step-by-step of what ECS is and what Docker is (There is plenty of material from Amazon on the topic), I’ll focus on sharing the code and project that has a working Elastic Image for Docker. Also, I had to re-build a docker image for Elasticsearch of my own because the default docker image that comes from Elasticsearch runs on open-jdk and I wanted Oracle SDK.

Without further Ado , Here is the git-hub project for the elasticsearch:

https://github.com/sajid2045/elasticsearch

And the Elasticsearch-ECS container (with some basic plugin and elastic-ec2 plugin installed):

https://github.com/sajid2045/elasticsearch-ecs

I have setup a build on docker-hub so feel free to use it directly from there:

https://hub.docker.com/r/sajid2045/elasticsearch/

And

https://hub.docker.com/r/sajid2045/elasticsearch-ecs/

Now, a note of credit , most of the work is copied from this blog post:

http://blog.dmcquay.com/devops/2015/09/12/running-elasticsearch-on-aws-ecs.html

However, I have faced some issues mostly because I am trying to run Elastic 2.2 version. But you must read through this blog to understand the internal working principle of this blog post and because its so nicely detailed there, I am not going to repeat them here.

Cloud Formation:
I have used a custom cloud formation script to make sure my instances ends up in the special VPC structure specific to my organization, you can use the default Elastic ECS Cloud template if you just want to try out the feature.

Feel free to look at it if you have similar requirement: https://raw.githubusercontent.com/sajid2045/elasticsearch-ecs/master/cloud-formation-template.json

With the Ready Docker Images and the recipe of how to use them, I think you should have a much smoother time setting it up , Best of luck 🙂

Using GraphDB to manage dynamic subscription

Recently we added functionalities for users to subscribe to their sports of interest so we can send them more personalized content. Given the dynamic nature of sports / match / fixture , I wanted the subscription topics flexible but still hierarchical. The idea was to enable someone to get each and every notification that happens on cricket or to choose a particular team / player etc and get more granular notification.

After trying to model the hierarchy into a RDBMS & document DB , we quickly realized traditional DB is just a bad fit for such recursive model design and the data retrieval queries will be too expensive.

This is finally the big enough problem when you can sell / justify introducing a completely new tool to address the issue. And my chance to use graph-db in production. I was introduced to the graph db concept to a previous project of mine and was really impressed by the ease of natural domain model mapping into the DB via the graph concept.

After some basic R & D, we decided to go with Orientdb. Having used elasticsearch cluster products , Orientdb clustering model seemed more natural compared to neo4j. Having said that, having a graph on a distributed cluster is not a easy problem which we realized later the hard way.

We could easily map our model in a graph schema like the following:

graph1

Notice how easily the concept of an unregistered device / user , and a user with multiple device fits into the domain. A user can subscribe to any node on the Tree that is Topic and the events automatically flows through the Subscriber.

This is how it looks like on Database:

sample

Performance:

With the default setup & no special tuning, the system is looking up 10,000 device-id to push the notification in < 500 ms , which is quite good for us. however, the orientdb-support tells us that it can be made even faster.

Lessons Learned:

>     We went with latest major production release 2.1 (which was released only 1 week ago) , and got burnt by many bugs / corner cases. It took upto 2.1.7 to finally have it stable.

>     Use TinkerPop (think jdbc for database) apis so you can switch between orientdb / neo4j

> If coding in java, use Frames api to maintain the Domain mapping

> Frames api is not properly tuned. I had to re-write most of my queries in gremlin . I am not sure about neo4j but orientdb recommends using gremlin / sql to run the search queries.

> Partition your clusters appropriately, it will result in proper utilization of all nodes in your cluster.

SumoShell – Become a shell ninja

Have you ever seen sys-admins doing awesome grep / awk queries and been amazed by it? Now you can do so too and in a much easier way!

A few months ago, I was writing some parser in sumologic and thinking this is so powerful, I wish I could run it on a regular text file. Well, I got a notification from google about sumoshell today.

And wait for it………they open sourced their parsing engine.

https://github.com/SumoLogic/sumoshell

What you can do with it? Wonders!

 

Here are some examples from their blog:


sudo tcpdump 2>/dev/null | sumo search | sumo parse "IP * > *:" as src, dest | sumo parse "length *" as length | sumo sum length by dest | render

Capture

 

OR:


tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render-basic

 

 

NeoPhobia & Docker

Back in 2004 when I was starting my career, I read something about ‘NeoPhobia’ in Kathy Sierra’s blog. NeoPhobia is avoiding something new. Details: https://en.wikipedia.org/wiki/Neophobia

And we tend to be NeoPhobic in learning new things  by making excuse about it. Most common excuse?: ‘We dont need to learn about it because its not good enough for us’..We technologist do it all the time. We do it to save ourselves from the trouble of learning about it. Remember how many people didn’t like Git and wanted to stay in SVN?

I try to watch out and try not to be a NeoPhobic in terms of technology, but every now and then it creeps in! I remember installing Docker back in 2013 and playing around with it. To be fair , docker wasn’t as smooth back then and the experience was not good enough. But still , I discarded it as a viable tool.

Fast forward 2 years and I was forced to look into docker again (Because of licensing issue, we had to run multiple microservice mule container in same server but still wanted isolation)…And my opinion? Its a crime not to know / use docker , and I have been committing it so far!!!

Even if you don’t run multiple containers in production , don’t have  needs for virtualisation, you MUST learn docker. Within my first week of learning docker, I am already using it for so many things. Don’t know a thing about NPM and still want to run a Node server ? Want to test 5 node hazlecast cluster in your mini laptop but dont have enough processing power? Limited by Mule licensing and deploying all your microservice in one container with no isolation?  Look no more..Docker is your friend!

Taking ThreadDump from Java Code

Copied from HERE with some code modification.

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

public class StackTraceUtils {
	private static final char PID_SEPERATOR = '@';
	private static String pathToJStack = "jstack";


	private static String acquirePid()
	{
		String mxName = ManagementFactory.getRuntimeMXBean().getName();

		int index = mxName.indexOf(PID_SEPERATOR);

		String result;

		if (index != -1) {
			result = mxName.substring(0, index);
		} else {
			throw new IllegalStateException("Could not acquire pid using " + mxName);
		}

		return result;
	}


	public static String executeJstack()
	{
		ProcessInterface pi = new ProcessInterface();

		int exitCode;

		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

		try {
			exitCode = pi.run(new String[] { pathToJStack, "-l", acquirePid(),}, byteArrayOutputStream);
		} catch (Exception e) {
			throw new IllegalStateException("Error invoking jstack", e);
		}

		if (exitCode != 0) {
			throw new IllegalStateException("Bad jstack exit code " + exitCode);
		}


		try {
			String str = new String(byteArrayOutputStream.toByteArray(), "UTF-8");
			return str;
		} catch (UnsupportedEncodingException e) {
			throw new RuntimeException(e);
		}

	}

	public static void main(String[] args) {

		final String s = executeJstack();
		System.out.println("s = " + s);
	}
}




import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class ProcessInterface
{	
	private final Map<String, String> addModifyEnv;
	private final Set<String> removeEnv;
	
	static class StreamGobbler extends Thread
	{
		private static int gobblerNumber = 0;
		
		private final InputStream is;
		private final OutputStream os;
		
		private static synchronized int getNextGobblerNumber()
		{
			return gobblerNumber++;
		}
		
		public StreamGobbler(InputStream is, OutputStream os)
		{
			super("StreamGobblerThread-" + getNextGobblerNumber());
			
			this.is = is;
			this.os = os;
		}
		
		@Override
		public void run()
		{
			try
			{
				OutputStreamWriter osw = new OutputStreamWriter(os);
				BufferedWriter bw = new BufferedWriter(osw);
				
				InputStreamReader isr = new InputStreamReader(is);
				BufferedReader br = new BufferedReader(isr);
				
				String line = null;
				
				while ((line = br.readLine()) != null)
				{
					bw.write(line);
					bw.newLine();
				}
				
				bw.flush();
				osw.flush();
			}
			catch (IOException ioe)
			{
				throw new IllegalStateException(ioe);
			}
		}
	}
	
	public ProcessInterface()
	{
		this.addModifyEnv = new HashMap<String, String>();
		this.removeEnv = new HashSet<String>();
	}
	
	public void removeEnv(String name)
	{
		if (addModifyEnv.containsKey(name))
		{
			addModifyEnv.remove(name);
		}
		
		removeEnv.add(name);
	}
	
	public void addEnv(String name, String value)
	{
		if (removeEnv.contains(name))
		{
			removeEnv.remove(name);
		}
		
		addModifyEnv.put(name, value);
	}
	
	public int run(String[] command) throws InterruptedException, IOException
	{
		return run(command, null, null, null);
	}
	
	public int run(String[] command, File workingDir) throws InterruptedException, IOException
	{
		return run(command, null, null, workingDir);
	}
	
	public int run(String[] command, OutputStream os) throws InterruptedException, IOException
	{
		return run(command, os, os, null);
	}
	
	public int run(String[] command, OutputStream os, File workingDir) throws InterruptedException, IOException
	{
		return run(command, os, os, workingDir);
	}
	
	public int run(String[] command, OutputStream std, OutputStream err, File workingDir)
			throws InterruptedException, IOException
	{
		StringBuilder sb = new StringBuilder();
		
		for (String s : command)
		{
			sb.append(s);
			sb.append(" ");
		}
		
		ProcessBuilder builder = new ProcessBuilder();
		
		if (workingDir != null)
		{
			builder.directory(workingDir);
		}
		
		Map<String, String> env = builder.environment();
		
		for (String name : removeEnv)
		{
			env.remove(name);
		}
		
		for (Entry<String, String> entry : addModifyEnv.entrySet())
		{
			env.put(entry.getKey(), entry.getValue());
		}
		
		builder.command(command);
		
		Process process = builder.start();
		
		OutputStream outStream = ((std == null) ? System.out : std);
		OutputStream errStream = ((err == null) ? System.err : err);
		
		StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream(), outStream);
		StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream(), errStream);
		
		outputGobbler.start();
		errorGobbler.start();
		
		int exitVal = process.waitFor();
		
		return exitVal;
	}
}


This can be very helpful, for example in a code block you suspect there is a deadlock, for Example timeout from connection pool , you can log the stacktrace so you can come back later and run a ThreadDump analysis tool on it. Add it to your code before the Operations guy restart your problematic server and report an issue in your name !

My Journey with Mule

Stage 1 : Drag & Drop, Enterprise Integration Patterns , Oh So Cool!!

Stage 2 : What the ****  $$$!!!??? IDE Sucks, weird behavior , silent xml config alteration ….How can anyone work on this!!! Going back to my core java apps…

Stage 3 (Used to all the quirks of Mule Studio) : Hmmm…I see the point..not so bad after all !!! Just built an entire state machine system app with DB integration / HTTP & TCP Endpoints in 90 mins!

Stage 4: Wait, they hid all the cool features in EE edition 😦 … I am supposed to debug my app only with log messages!! Thats almost as bad as the time when I used to write EJB code! hmmmm….start looking for alternative? How is that spring integration project coming along? 

Adding Functional power in Java programs

I am digging the Functional style power in my everyday regular programming. I was implementing some failover connection setup in a pure Java code and thought about adding the ugly java command pattern as an alternative to closure. At the end, the code turned up not so bad so I thought I would share it.

Problem:

We have two Quentin Server – Server A & Server B (A kind of recording server used in broadcast industry). The QuentinApi abstracts the interaction with the server which maintains some persistent TCP connection. Now the requirement is, we have to keep talking to Server A. However, if the connection is not valid / or it went down for some reason, we need to switch back to Server B.

Here is how the Quentin Api Looks like:

/**
 * 
 * Api to create/update/delete clips on Quantel. Clips are actually created from the channel. The quantel system has a pool of server. 
 * Every server has a fixed number of channel. 
 * 
 * @author smoinuddin
 *
 */
public interface QuentinApi {
	
	public String createClip(String clipName, int server, int channel, DateTime startTime, DateTime endTime) throws QuantelServerLookupFailedException, QuentinApiException;
	
	public void updateClip(String clipName, int server, int channel, DateTime startTime, DateTime endTime) throws QuentinApiException;
	
	public void cancelRecording(String clipName, int server, int channel) throws QuentinApiException;
      ...... // some more similar methods....	
}

Now all of these method should have the feature of failure detection and should revert to Server B whenever there is a problem. I have solved similar problem before with Java using Dynamic Proxy and Spring AOP, but I thought using Functional Closure style is simpler and easier to understand.

public class QuentinApiFailoverProxy implements QuentinApi {
	private static final Logger LOGGER = LoggerFactory.getLogger(QuentinApiFailoverProxy.class);
	
	private QuentinApiImpl quantelServerA;
	private QuentinApiImpl quantelServerB;
	
	private QuentinApiImpl currentConnection;
	
	public QuentinApiFailoverProxy(QuentinApiImpl quantelServerA,
			QuentinApiImpl quantelServerB) {
		super();
		this.quantelServerA = quantelServerA;
		this.quantelServerB = quantelServerB;
		
		this.currentConnection = quantelServerA;
	}
	
	private abstract class QuantelApiCall<T> {
		abstract T execute() throws QuentinApiException;
		
		public  String toString() {
			return null;
		}
	}
	
	private <T> T execute(QuantelApiCall<T> call) throws QuentinApiException{
		if(call.toString() != null) {
			LOGGER.debug("***executing Call: {}", call);
		}
		try {
			if(currentConnection.validate()) {
				return call.execute();
			} else {
				LOGGER.warn("Could not validate current connection: " + currentConnection);
				throw new QuantelCommunicationException("Could not validate current connection: " + currentConnection);
			}
		} catch (QuantelServerLookupFailedException | 
				QuantelCommunicationException | 
				NullPointerException| 
				org.omg.CORBA.COMM_FAILURE |
				org.omg.CORBA.OBJECT_NOT_EXIST e) {
			
			LOGGER.warn("Corba Failed to talk to {}", currentConnection, e);
			resetConnections();
			LOGGER.warn("Trying to talk to {}", currentConnection);
			
			return call.execute();
		} finally {
			if(call.toString() != null) {
				LOGGER.debug("***finished executing call: " + call);
			}
		}
	}
	
	private synchronized void resetConnections() throws QuantelCommunicationException {
		boolean validCurrentConnection = false;
		try {
			currentConnection.reset();
			if(currentConnection.validate()) {
				validCurrentConnection = true;
			}
		} catch (Exception e) {	}
		
		if(!validCurrentConnection) {
			currentConnection = getAlternative();			
			currentConnection.reset();
			if(!currentConnection.validate()) {
				LOGGER.warn("Failed to talk to main connection & alternative connection . Alternative connection just tried = {}" , currentConnection);
				throw new QuantelCommunicationException("Failed to talk to main connection & alternative connection . Alternative connection just tried: " + currentConnection);
			}
		} else {
			LOGGER.warn("re-initiated the current connection to {}");
		}
		
	}

	private QuentinApiImpl getAlternative() {
		if(currentConnection.equals(quantelServerA)) {
			return quantelServerB;
		} else {
			return quantelServerA;
		}
	}    

	@Override
	public String createClip(final String clipName, final int serverId, final int channel,
			final DateTime startTime,final  DateTime endTime)
			throws QuantelServerLookupFailedException, QuentinApiException {
		
		return execute(new QuantelApiCall<String>() {
			@Override
			String execute() throws QuentinApiException {
				return getCurrentConnection().createClip(clipName, serverId, channel, startTime, endTime);
			}

			@Override
			public String toString() {
				return "createClip: " + clipName ;
			}});		
	}

	@Override
	public void updateClip(final String clipName,final  int server,final  int channel,
			final DateTime startTime, final DateTime endTime) throws QuentinApiException {
		execute(new QuantelApiCall<Object>() {
			@Override
			Object execute() throws QuentinApiException {
				getCurrentConnection().updateClip(clipName, server, channel, startTime, endTime);
				return null;
			}

			@Override
			public String toString() {
				return "Update Clip: " + clipName;
			}});	
	}

	private QuentinApiImpl synchronized  getCurrentConnection() {
		return currentConnection;
	}

.................. and so on.
}

Notice how the execute method wraps each method call and adds exception handling, reset connection, try alternative connection feature. Introducing the QuantelApiCall lets us bundle our code and pass it as an argument to another method (Code as object). Again the execute() call does not take any parameter, instead it uses static binding functionality with the local variable to carry them over to the new context.

One thing you need to be cautious is to use the getCurrentConnection() in the closures to enable java dynamic binding. If I used the currentConnection instead, the currentConnection at the time of ‘Closing’ (aka Creating the closure object) would be used everytime you call execute(). But methods are always dynamically bounded and will be reevaluated with every call of the execute().

Again, the code can be called by multiple thread at the same time so you need synchronization in the reset() call (You don’t want two threads trying to reset the connection at the same time). Other interesting thing is you need ‘synchronized’ on the getCurrentConnection to avoid Thread data caching issue. I have noticed a lot of java developer mistakenly think synchronized is only used for mutual exclusion but it is also necessary for visibility of the data.