We use cookies to ensure that we give you the best experience on our website. By continuing to use the website you agree for the use of cookies for better website performance and a personalized experience.

Druid & Kafka Indexing Service: How to fix the merging segments issue

Rafał Michalski
June 18, 2017
Druid & Kafka Indexing Service: How to fix the merging segments issue
Rafał Michalski
June 18, 2017
X MIN Read
June 18, 2017
X MIN Read
June 18, 2017
X MIN Read

This article describes the bumpy road we travelled to successfully run Hadoop Index Job on AWS EMR for the latest Druid 0.10.0  with deep storage on S3 Signature Version 4 Only.


Deep.BI provides multitenant Druid infrastructure in the cloud with complementary Kafka and Flink software for scalable and reliable data collection, enrichment, storage, analytics, real-time usage, and robotic process automation (RPA). Recently, we upgraded our infrastructure to the latest version of Druid (0.10.0) and the newest way of ingesting data through the Kafka Indexing Service (KIS). The biggest problem in this setup is the enormous amount of sharded segments which are supposed to be merged by KIS, but aren't. This is planned to be fixed in future Druid releases, but until then we have had to fix it by ourselves.

Our Druid architecture

We decided to run our Druid infrastructure in a hybrid cloud:

  • KIS (Middlemanagers + Overlord) run in the private cloud (dedicated servers from a datacenter)
  • Brokers + Historicals run in the private cloud too
  • Coordinator runs on AWS EC2
  • for deep storage we use Amazon S3
  • as a metadata store we chose AWS RDS

(we publish our reasons for these decisions in a separate article, in the meantime, you can ask us via email: tech@deep.bi)

Our approach

To solve the mentioned problem we had to setup a Hadoop cluster to periodically run Hadoop Index Jobs.

Due to its periodic nature, we decided to dynamically start an AWS Elastic Map Reduce (EMR) instance with an EMR-terminate option after the job's completion. The unexpected problems we faced were related to different S3 versions support and compatibility of Druid extensions, which resulted in non-trivial tasks such as:

  • Druid recompilation with proper extensions
  • VPN between AWS and our cloud


Creating a job description task

First, we assembled a job task description

according to http://druid.io/docs/0.10.0/ingestion/batch-ingestion.htm and http://druid.io/docs/latest/operations/other-hadoop.html.

Adding Hadoop EMR jars and config files to Druid

Next, we added Hadoop client jars compatible with our EMR version to DRUID_DIR/hadoop-dependencies/hadoop-client/2.7.3.

Additionally, we copied a bunch of the *-site.xml files from EMR's /etc/hadoop/conf to DRUID_DIR/conf/druid/_common directory.

Setting up a VPN between your Druid and EMR

After submitting the job to the indexer we got our first error:

2017-05-25T15:41:01,887 INFO [Thread-56] org.apache.hadoop.hdfs.DFSClient - Exception in createBlockOutputStreamjava.net.ConnectException: Connection refused

The problem was that the EMR data nodes were only accessible via local IP addresses, while our middle managers were running on our premises.

Thus, we had to set up a VPN connection between our private cloud and AWS and configure our Druid cluster to work with Amazon EMR. Then we thought we were finally ready to successfully submit an indexing task, but...

Check out what Deep.BI is all about!

Tuning configs and adding other libraries to Druid

After successfully solving all the networking problems, we hit another error:

2017-05-25T22:53:16,654 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splitsjava.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found

A quick look into core-site.xml revealed this:

<property><name>fs.s3.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property> <property><name>fs.s3n.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property>

So we thought of two possible solutions:

  • somehow get EmrFileSystem jars from somewhere
  • replace implementation with org.apache.hadoop.fs.s3native.NativeS3FileSystem as suggested by this article.

In the beginning, we went with the first solution, so we downloaded a bunch of jar files from the EMR, but we immediately faced class dependency conflicts, so we decided not to continue with this solution.

Instead, we replaced the above core-site.xml properties with:


following with S3 credentials:

<property><name>fs.s3.awsAccessKeyId</name><value>YOUR_S3_ACCESS_KEY</value></property> <property><name>fs.s3.awsSecretAccessKey</name><value>YOUR_S3_SECRET_KEY</value></property> <property><name>fs.s3n.awsAccessKeyId</name><value>YOUR_S3_ACCESS_KEY</value></property> <property><name>fs.s3n.awsSecretAccessKey</name><value>YOUR_S3_SECRET_KEY</value></property>

Well, the next error was obvious:

2017-05-26T00:17:09,753 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splitsjava.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found

as we didn't include hadoop-aws-2.7.3.jar in hadoop-dependencies. We decided to use EMR's patched version hadoop-aws-2.7.3-amzn-2.jar. You could probably use the vanilla version from maven as well but we didn't try it.

Adjusting configs to S3 region - solving "Signature Version 4 only" issue

EMR map reduce jobs finally started but soon failed with the following error:

2017-05-26T10:33:18,346 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0013_m_000003_0, Status : FAILEDError: java.lang.RuntimeException: java.io.IOException: s3n://SOME_BUCKET_NAME : 400 : Bad Request

If you google it, you might stumble upon HADOOP-13325 and other relevant issues. Long story short: our S3 region was Signature Version 4 Only and this isn't compatible with the NativeS3FileSystem implemented in hadoop-aws-2.7.3.

Following some advice on google groups, we tried to enforce the new s3a filesystem implementation.

We eventually changed the relevant properties in core-site.xml to:

<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property> <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property>

following S3 credentials for s3a:

<property><name>fs.s3a.access.key</name><value>YOUR_S3_ACCESS_KEY</value></property> <property><name>fs.s3a.secret.key</name><value>YOUR_S3_SECRET_KEY</value></property>

and some additional S3a magic:

<property><name>fs.s3a.endpoint</name><value>s3.eu-central-1.amazonaws.com</value></property> <property><name>com.amazonaws.services.s3.enableV4</name><value>true</value></property>

Patching and recompiling Druid

After the last update, our task didn't even manage to start the map reduce job, and failed with:

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V

If you want more details, go to HADOOP-12420 and http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/

One of the proposed solutions was to replace Druid's aws-sdk-java-1.10.21 with with aws-sdk-java-1.7.4 and "not a more recent version".

But fortunately at the end of the HADOOP-12420 thread this pull request is mentioned which leads us to #643.

So we decided to just recompile the aws-java-sdk-s3-1.10.21.jar with this backward compatibility enabling patch and replace the vanilla file in DRUID_DIR/lib/aws-java-sdk-s3-1.10.21.jar.

This required all of the Druid middle managers to be restarted. The map reduce job finally took off without errors. The job successfully connected with S3 and accessed the source segments. But we weren't out of the woods yet! The last map reduce job finished composing segments, but while it was about to upload them, this happened:

2017-05-26T16:55:59,095 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0016_r_000000_0, Status : FAILEDError: io.druid.java.util.common.IAE: Unknown file system scheme [s3a]at io.druid.indexer.JobHelper.serializeOutIndex(JobHelper.java:449)

Looking at JobHelper.serializeOutIndex revealed how indexer is composing loadSpec for new segments from outputFS which, in this instance, was S3AFileSystem.

Without further ado we assembled this simple patch, recompiled druid, reinstalled, restarted middle managers and voilà.

After that, the job finally succeeded!

Learn more about our Druid expertise and how we can help

If you'd like more details or support just drop us a line at hello@deep.bi

You Might Also Like