my first commit

presto-jd
wu ming 10 years ago
commit 861eb45025

22
.gitignore vendored

@ -0,0 +1,22 @@
*.iml
*.ipr
*.iws
target/
/var
/*/var/
pom.xml.versionsBackup
test-output/
/atlassian-ide-plugin.xml
.idea
.DS_Store
.classpath
.settings
.project
temp-testng-customsuite.xml
test-output
.externalToolBuilders
*~
benchmark_outputs
*.pyc
*.class
.checkstyle

@ -0,0 +1,19 @@
language: java
jdk:
- oraclejdk8
env:
global:
- MAVEN_OPTS="-Xmx256M"
sudo: false
cache:
directories:
- $HOME/.m2/io
- $HOME/.m2/org
install: mvn install -DskipTests=true -Dmaven.javadoc.skip=true -B -V -q -T 2
script: mvn test -Dair.check.skip-dependency=true

@ -0,0 +1,11 @@
# Contributing to Presto
## Contributor License Agreement ("CLA")
In order to accept your pull request, we need you to submit a CLA. You only need to do this once, so if you've done this for another Facebook open source project, you're good to go. If you are submitting a pull request for the first time, just let us know that you have completed the CLA and we can cross-check with your GitHub username.
Complete your CLA here: <https://code.facebook.com/cla>
## License
By contributing to Presto, you agree that your contributions will be licensed under the [Apache License Version 2.0 (APLv2)](LICENSE).

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -0,0 +1,73 @@
# Presto
Presto is a distributed SQL query engine for big data.
See the [User Manual](https://prestodb.io/docs/current/) for deployment instructions and end user documentation.
## Requirements
* Mac OS X or Linux
* Java 8, 64-bit
* Maven 3.2.3+ (for building)
* Python 2.4+ (for running with the launcher script)
## Building Presto
Presto is a standard Maven project. Simply run the following command from the project root directory:
mvn clean install
On the first build, Maven will download all the dependencies from the internet and cache them in the local repository (`~/.m2/repository`), which can take a considerable amount of time. Subsequent builds will be faster.
Presto has a comprehensive set of unit tests that can take several minutes to run. You can disable the tests when building:
mvn clean install -DskipTests
## Running Presto in your IDE
### Overview
After building Presto for the first time, you can load the project into your IDE and run the server. We recommend using [IntelliJ IDEA](http://www.jetbrains.com/idea/). Because Presto is a standard Maven project, you can import it into your IDE using the root `pom.xml` file. In IntelliJ, choose Open Project from the Quick Start box or choose Open from the File menu and select the root `pom.xml` file.
After opening the project in IntelliJ, double check that the Java SDK is properly configured properly for the project:
* Open the File menu and select Project Structure
* In the SDKs section, ensure that a 1.8 JDK is selected (create one if none exist)
* In the Project section, ensure the Project language level is set to 8.0 as Presto makes use of several Java 8 language features
Presto comes with sample configuration that should work out-of-the-box for development. Use the following options to create a run configuration:
* Main Class: `com.facebook.presto.server.PrestoServer`
* VM Options: `-ea -Xmx2G -Dconfig=etc/config.properties -Dlog.levels-file=etc/log.properties`
* Working directory: `$MODULE_DIR$`
* Use classpath of module: `presto-main`
The working directory should be the `presto-main` subdirectory. In IntelliJ, using `$MODULE_DIR$` accomplishes this automatically.
Additionally, the Hive plugin must be configured with location of your Hive metastore Thrift service. Add the following to the list of VM options, replacing `localhost:9083` with the correct host and port (or use the below value if you do not have a Hive metastore):
-Dhive.metastore.uri=thrift://localhost:9083
### Using SOCKS for Hive or HDFS
If your Hive metastore or HDFS cluster is not directly accessible to your local machine, you can use SSH port forwarding to access it. Setup a dynamic SOCKS proxy with SSH listening on local port 1080:
ssh -v -N -D 1080 server
Then add the following to the list of VM options:
-Dhive.metastore.thrift.client.socks-proxy=localhost:1080
### Running the CLI
Start the CLI to connect to the server and run SQL queries:
presto-cli/target/presto-cli-*-executable.jar
Run a query to see the nodes in the cluster:
SELECT * FROM system.runtime.nodes;
In the sample configuration, the Hive connector is mounted in the `hive` catalog, so you can run the following queries to show the tables in the Hive database `default`:
SHOW TABLES FROM hive.default;

File diff suppressed because it is too large Load Diff

@ -0,0 +1,201 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.107</version>
</parent>
<artifactId>presto-base-jdbc</artifactId>
<name>presto-base-jdbc</name>
<description>Presto - Base JDBC Connector</description>
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
</properties>
<dependencies>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<scope>provided</scope>
</dependency>
<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpch</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.airlift.tpch</groupId>
<artifactId>tpch</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- integration tests take a very long time so only run them in the CI server -->
<excludes>
<exclude>**/TestJdbcDistributedQueries.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>ci</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes combine.self="override" />
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import io.airlift.configuration.Config;
import javax.validation.constraints.NotNull;
public class BaseJdbcConfig
{
private String connectionUrl;
private String connectionUser;
private String connectionPassword;
@NotNull
public String getConnectionUrl()
{
return connectionUrl;
}
@Config("connection-url")
public BaseJdbcConfig setConnectionUrl(String connectionUrl)
{
this.connectionUrl = connectionUrl;
return this;
}
public String getConnectionUser()
{
return connectionUser;
}
@Config("connection-user")
public BaseJdbcConfig setConnectionUser(String connectionUser)
{
this.connectionUser = connectionUser;
return this;
}
public String getConnectionPassword()
{
return connectionPassword;
}
@Config("connection-password")
public BaseJdbcConfig setConnectionPassword(String connectionPassword)
{
this.connectionPassword = connectionPassword;
return this;
}
}

@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TupleDomain;
import io.airlift.slice.Slice;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public interface JdbcClient
{
Set<String> getSchemaNames();
List<SchemaTableName> getTableNames(@Nullable String schema);
@Nullable
JdbcTableHandle getTableHandle(SchemaTableName schemaTableName);
List<JdbcColumnHandle> getColumns(JdbcTableHandle tableHandle);
ConnectorPartitionResult getPartitions(JdbcTableHandle jdbcTableHandle, TupleDomain<ColumnHandle> tupleDomain);
ConnectorSplitSource getPartitionSplits(JdbcPartition jdbcPartition);
Connection getConnection(JdbcSplit split)
throws SQLException;
String buildSql(JdbcSplit split, List<JdbcColumnHandle> columnHandles);
JdbcOutputTableHandle beginCreateTable(ConnectorTableMetadata tableMetadata);
void commitCreateTable(JdbcOutputTableHandle handle, Collection<Slice> fragments);
void dropTable(JdbcTableHandle jdbcTableHandle);
String buildInsertSql(JdbcOutputTableHandle handle);
Connection getConnection(JdbcOutputTableHandle handle)
throws SQLException;
}

@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
public final class JdbcColumnHandle
implements ColumnHandle
{
private final String connectorId;
private final String columnName;
private final Type columnType;
@JsonCreator
public JdbcColumnHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("columnName") String columnName,
@JsonProperty("columnType") Type columnType)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.columnName = checkNotNull(columnName, "columnName is null");
this.columnType = checkNotNull(columnType, "columnType is null");
}
@JsonProperty
public String getConnectorId()
{
return connectorId;
}
@JsonProperty
public String getColumnName()
{
return columnName;
}
@JsonProperty
public Type getColumnType()
{
return columnType;
}
public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(columnName, columnType, false);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
JdbcColumnHandle o = (JdbcColumnHandle) obj;
return Objects.equals(this.connectorId, o.connectorId) &&
Objects.equals(this.columnName, o.columnName);
}
@Override
public int hashCode()
{
return Objects.hash(connectorId, columnName);
}
@Override
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("columnName", columnName)
.add("columnType", columnType)
.toString();
}
}

@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
import com.facebook.presto.spi.ConnectorRecordSinkProvider;
import com.facebook.presto.spi.ConnectorSplitManager;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
import javax.inject.Inject;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcConnector
implements Connector
{
private static final Logger log = Logger.get(JdbcConnector.class);
private final LifeCycleManager lifeCycleManager;
private final JdbcMetadata jdbcMetadata;
private final JdbcSplitManager jdbcSplitManager;
private final JdbcRecordSetProvider jdbcRecordSetProvider;
private final JdbcHandleResolver jdbcHandleResolver;
private final JdbcRecordSinkProvider jdbcRecordSinkProvider;
@Inject
public JdbcConnector(
LifeCycleManager lifeCycleManager,
JdbcMetadata jdbcMetadata,
JdbcSplitManager jdbcSplitManager,
JdbcRecordSetProvider jdbcRecordSetProvider,
JdbcHandleResolver jdbcHandleResolver,
JdbcRecordSinkProvider jdbcRecordSinkProvider)
{
this.lifeCycleManager = checkNotNull(lifeCycleManager, "lifeCycleManager is null");
this.jdbcMetadata = checkNotNull(jdbcMetadata, "jdbcMetadata is null");
this.jdbcSplitManager = checkNotNull(jdbcSplitManager, "jdbcSplitManager is null");
this.jdbcRecordSetProvider = checkNotNull(jdbcRecordSetProvider, "jdbcRecordSetProvider is null");
this.jdbcHandleResolver = checkNotNull(jdbcHandleResolver, "jdbcHandleResolver is null");
this.jdbcRecordSinkProvider = checkNotNull(jdbcRecordSinkProvider, "jdbcRecordSinkProvider is null");
}
@Override
public ConnectorMetadata getMetadata()
{
return jdbcMetadata;
}
@Override
public ConnectorSplitManager getSplitManager()
{
return jdbcSplitManager;
}
@Override
public ConnectorRecordSetProvider getRecordSetProvider()
{
return jdbcRecordSetProvider;
}
@Override
public ConnectorHandleResolver getHandleResolver()
{
return jdbcHandleResolver;
}
@Override
public ConnectorRecordSinkProvider getRecordSinkProvider()
{
return jdbcRecordSinkProvider;
}
@Override
public final void shutdown()
{
try {
lifeCycleManager.stop();
}
catch (Exception e) {
log.error(e, "Error shutting down connector");
}
}
}

@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
public class JdbcConnectorFactory
implements ConnectorFactory
{
private final String name;
private final Module module;
private final Map<String, String> optionalConfig;
private final ClassLoader classLoader;
public JdbcConnectorFactory(String name, Module module, Map<String, String> optionalConfig, ClassLoader classLoader)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
this.name = name;
this.module = checkNotNull(module, "module is null");
this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null"));
this.classLoader = checkNotNull(classLoader, "classLoader is null");
}
@Override
public String getName()
{
return name;
}
@Override
public Connector create(String connectorId, Map<String, String> requiredConfig)
{
checkNotNull(requiredConfig, "requiredConfig is null");
checkNotNull(optionalConfig, "optionalConfig is null");
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(new JdbcModule(connectorId), module);
Injector injector = app
.strictConfig()
.doNotInitializeLogging()
.setRequiredConfigurationProperties(requiredConfig)
.setOptionalConfigurationProperties(optionalConfig)
.initialize();
return injector.getInstance(JdbcConnector.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkNotNull;
public final class JdbcConnectorId
{
private final String id;
public JdbcConnectorId(String id)
{
this.id = checkNotNull(id, "id is null");
}
@Override
public String toString()
{
return id;
}
@Override
public int hashCode()
{
return Objects.hash(id);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
JdbcConnectorId other = (JdbcConnectorId) obj;
return Objects.equals(this.id, other.id);
}
}

@ -0,0 +1,84 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import javax.inject.Inject;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcHandleResolver
implements ConnectorHandleResolver
{
private final String connectorId;
@Inject
public JdbcHandleResolver(JdbcConnectorId clientId)
{
this.connectorId = checkNotNull(clientId, "clientId is null").toString();
}
@Override
public boolean canHandle(ConnectorTableHandle tableHandle)
{
return tableHandle instanceof JdbcTableHandle && ((JdbcTableHandle) tableHandle).getConnectorId().equals(connectorId);
}
@Override
public boolean canHandle(ColumnHandle columnHandle)
{
return columnHandle instanceof JdbcColumnHandle && ((JdbcColumnHandle) columnHandle).getConnectorId().equals(connectorId);
}
@Override
public boolean canHandle(ConnectorSplit split)
{
return split instanceof JdbcSplit && ((JdbcSplit) split).getConnectorId().equals(connectorId);
}
@Override
public boolean canHandle(ConnectorOutputTableHandle tableHandle)
{
return (tableHandle instanceof JdbcOutputTableHandle) && ((JdbcOutputTableHandle) tableHandle).getConnectorId().equals(connectorId);
}
@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
return JdbcTableHandle.class;
}
@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
return JdbcColumnHandle.class;
}
@Override
public Class<? extends ConnectorSplit> getSplitClass()
{
return JdbcSplit.class;
}
@Override
public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
return JdbcOutputTableHandle.class;
}
}

@ -0,0 +1,209 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.InsertOption;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import javax.inject.Inject;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static com.facebook.presto.plugin.jdbc.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcMetadata
implements ConnectorMetadata
{
private final JdbcClient jdbcClient;
private final boolean allowDropTable;
@Inject
public JdbcMetadata(JdbcConnectorId connectorId, JdbcClient jdbcClient, JdbcMetadataConfig config)
{
this.jdbcClient = checkNotNull(jdbcClient, "client is null");
checkNotNull(config, "config is null");
allowDropTable = config.isAllowDropTable();
}
@Override
public List<String> listSchemaNames(ConnectorSession session)
{
return ImmutableList.copyOf(jdbcClient.getSchemaNames());
}
@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return jdbcClient.getTableHandle(tableName);
}
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle table)
{
JdbcTableHandle handle = checkType(table, JdbcTableHandle.class, "tableHandle");
ImmutableList.Builder<ColumnMetadata> columnMetadata = ImmutableList.builder();
for (JdbcColumnHandle column : jdbcClient.getColumns(handle)) {
columnMetadata.add(column.getColumnMetadata());
}
return new ConnectorTableMetadata(handle.getSchemaTableName(), columnMetadata.build());
}
@Override
public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull)
{
return jdbcClient.getTableNames(schemaNameOrNull);
}
@Override
public ColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle)
{
return null;
}
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorTableHandle tableHandle)
{
JdbcTableHandle jdbcTableHandle = checkType(tableHandle, JdbcTableHandle.class, "tableHandle");
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (JdbcColumnHandle column : jdbcClient.getColumns(jdbcTableHandle)) {
columnHandles.put(column.getColumnMetadata().getName(), column);
}
return columnHandles.build();
}
@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName tableName : listTables(session, prefix.getSchemaName())) {
try {
JdbcTableHandle tableHandle = jdbcClient.getTableHandle(tableName);
if (tableHandle == null) {
continue;
}
columns.put(tableName, getTableMetadata(tableHandle).getColumns());
}
catch (TableNotFoundException e) {
// table disappeared during listing operation
}
}
return columns.build();
}
@Override
public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
checkType(tableHandle, JdbcTableHandle.class, "tableHandle");
return checkType(columnHandle, JdbcColumnHandle.class, "columnHandle").getColumnMetadata();
}
@Override
public boolean canCreateSampledTables(ConnectorSession session)
{
return false;
}
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating tables");
}
@Override
public void dropTable(ConnectorTableHandle tableHandle)
{
if (!allowDropTable) {
throw new PrestoException(PERMISSION_DENIED, "DROP TABLE is disabled in this catalog");
}
JdbcTableHandle handle = checkType(tableHandle, JdbcTableHandle.class, "tableHandle");
jdbcClient.dropTable(handle);
}
@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
return jdbcClient.beginCreateTable(tableMetadata);
}
@Override
public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
{
JdbcOutputTableHandle handle = checkType(tableHandle, JdbcOutputTableHandle.class, "tableHandle");
jdbcClient.commitCreateTable(handle, fragments);
}
@Override
public void renameTable(ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support renaming tables");
}
@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, InsertOption insertOption)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support inserts");
}
@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
{
throw new UnsupportedOperationException();
}
@Override
public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
}
@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
}
@Override
public List<SchemaTableName> listViews(ConnectorSession session, String schemaNameOrNull)
{
return ImmutableList.of();
}
@Override
public Map<SchemaTableName, String> getViews(ConnectorSession session, SchemaTablePrefix prefix)
{
return ImmutableMap.of();
}
}

@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
public class JdbcMetadataConfig
{
private boolean allowDropTable;
public boolean isAllowDropTable()
{
return allowDropTable;
}
@Config("allow-drop-table")
@ConfigDescription("Allow connector to drop tables")
public JdbcMetadataConfig setAllowDropTable(boolean allowDropTable)
{
this.allowDropTable = allowDropTable;
return this;
}
}

@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.airlift.configuration.ConfigBinder.configBinder;
public class JdbcModule
implements Module
{
private final String connectorId;
public JdbcModule(String connectorId)
{
this.connectorId = checkNotNull(connectorId, "connector id is null");
}
@Override
public void configure(Binder binder)
{
binder.bind(JdbcConnectorId.class).toInstance(new JdbcConnectorId(connectorId));
binder.bind(JdbcMetadata.class).in(Scopes.SINGLETON);
binder.bind(JdbcSplitManager.class).in(Scopes.SINGLETON);
binder.bind(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(JdbcHandleResolver.class).in(Scopes.SINGLETON);
binder.bind(JdbcRecordSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(JdbcConnector.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(JdbcMetadataConfig.class);
}
}

@ -0,0 +1,181 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
public class JdbcOutputTableHandle
implements ConnectorOutputTableHandle
{
private final String connectorId;
private final String catalogName;
private final String schemaName;
private final String tableName;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final String tableOwner;
private final String temporaryTableName;
private final String connectionUrl;
private final Map<String, String> connectionProperties;
@JsonCreator
public JdbcOutputTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("catalogName") @Nullable String catalogName,
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("tableOwner") String tableOwner,
@JsonProperty("temporaryTableName") String temporaryTableName,
@JsonProperty("connectionUrl") String connectionUrl,
@JsonProperty("connectionProperties") Map<String, String> connectionProperties)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = checkNotNull(tableName, "tableName is null");
this.tableOwner = checkNotNull(tableOwner, "tableOwner is null");
this.temporaryTableName = checkNotNull(temporaryTableName, "temporaryTableName is null");
this.connectionUrl = checkNotNull(connectionUrl, "connectionUrl is null");
this.connectionProperties = ImmutableMap.copyOf(checkNotNull(connectionProperties, "connectionProperties is null"));
checkNotNull(columnNames, "columnNames is null");
checkNotNull(columnTypes, "columnTypes is null");
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match");
this.columnNames = ImmutableList.copyOf(columnNames);
this.columnTypes = ImmutableList.copyOf(columnTypes);
}
@JsonProperty
public String getConnectorId()
{
return connectorId;
}
@JsonProperty
@Nullable
public String getCatalogName()
{
return catalogName;
}
@JsonProperty
@Nullable
public String getSchemaName()
{
return schemaName;
}
@JsonProperty
public String getTableName()
{
return tableName;
}
@JsonProperty
public List<String> getColumnNames()
{
return columnNames;
}
@JsonProperty
public List<Type> getColumnTypes()
{
return columnTypes;
}
@JsonProperty
public String getTableOwner()
{
return tableOwner;
}
@JsonProperty
public String getTemporaryTableName()
{
return temporaryTableName;
}
@JsonProperty
public String getConnectionUrl()
{
return connectionUrl;
}
@JsonProperty
public Map<String, String> getConnectionProperties()
{
return connectionProperties;
}
@Override
public String toString()
{
return format("jdbc:%s.%s.%s", catalogName, schemaName, tableName);
}
@Override
public int hashCode()
{
return Objects.hash(
connectorId,
catalogName,
schemaName,
tableName,
columnNames,
columnTypes,
tableOwner,
temporaryTableName,
connectionUrl,
connectionProperties);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
JdbcOutputTableHandle other = (JdbcOutputTableHandle) obj;
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.catalogName, other.catalogName) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName) &&
Objects.equals(this.columnNames, other.columnNames) &&
Objects.equals(this.columnTypes, other.columnTypes) &&
Objects.equals(this.tableOwner, other.tableOwner) &&
Objects.equals(this.temporaryTableName, other.temporaryTableName) &&
Objects.equals(this.connectionUrl, other.connectionUrl) &&
Objects.equals(this.connectionProperties, other.connectionProperties);
}
}

@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.TupleDomain;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcPartition
implements ConnectorPartition
{
private final JdbcTableHandle jdbcTableHandle;
private final TupleDomain<ColumnHandle> domain;
public JdbcPartition(JdbcTableHandle jdbcTableHandle, TupleDomain<ColumnHandle> domain)
{
this.jdbcTableHandle = checkNotNull(jdbcTableHandle, "jdbcTableHandle is null");
this.domain = checkNotNull(domain, "domain is null");
}
@Override
public String getPartitionId()
{
return jdbcTableHandle.toString();
}
public JdbcTableHandle getJdbcTableHandle()
{
return jdbcTableHandle;
}
@Override
public TupleDomain<ColumnHandle> getTupleDomain()
{
return domain;
}
@Override
public String toString()
{
return toStringHelper(this)
.add("jdbcTableHandle", jdbcTableHandle)
.toString();
}
}

@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Module;
import java.util.List;
import java.util.Map;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
public class JdbcPlugin
implements Plugin
{
private final String name;
private final Module module;
private Map<String, String> optionalConfig = ImmutableMap.of();
public JdbcPlugin(String name, Module module)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
this.name = name;
this.module = checkNotNull(module, "module is null");
}
@Override
public void setOptionalConfig(Map<String, String> optionalConfig)
{
this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null"));
}
@Override
public <T> List<T> getServices(Class<T> type)
{
if (type == ConnectorFactory.class) {
return ImmutableList.of(type.cast(new JdbcConnectorFactory(name, module, optionalConfig, getClassLoader())));
}
return ImmutableList.of();
}
private static ClassLoader getClassLoader()
{
return firstNonNull(Thread.currentThread().getContextClassLoader(), JdbcPlugin.class.getClassLoader());
}
}

@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcRecordSet
implements RecordSet
{
private final JdbcClient jdbcClient;
private final List<JdbcColumnHandle> columnHandles;
private final List<Type> columnTypes;
private final JdbcSplit split;
public JdbcRecordSet(JdbcClient jdbcClient, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
{
this.jdbcClient = checkNotNull(jdbcClient, "jdbcClient is null");
this.split = checkNotNull(split, "split is null");
checkNotNull(split, "split is null");
this.columnHandles = checkNotNull(columnHandles, "column handles is null");
ImmutableList.Builder<Type> types = ImmutableList.builder();
for (JdbcColumnHandle column : columnHandles) {
types.add(column.getColumnType());
}
this.columnTypes = types.build();
}
@Override
public List<Type> getColumnTypes()
{
return columnTypes;
}
@Override
public RecordCursor cursor()
{
return new JdbcRecordCursor(jdbcClient, split, columnHandles);
}
}

@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.RecordSet;
import com.google.common.collect.ImmutableList;
import javax.inject.Inject;
import java.util.List;
import static com.facebook.presto.plugin.jdbc.Types.checkType;
import static com.google.common.base.Preconditions.checkNotNull;
public class JdbcRecordSetProvider
implements ConnectorRecordSetProvider
{
private final JdbcClient jdbcClient;
@Inject
public JdbcRecordSetProvider(JdbcClient jdbcClient)
{
this.jdbcClient = checkNotNull(jdbcClient, "jdbcClient is null");
}
@Override
public RecordSet getRecordSet(ConnectorSplit split, List<? extends ColumnHandle> columns)
{
JdbcSplit jdbcSplit = checkType(split, JdbcSplit.class, "split");
ImmutableList.Builder<JdbcColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : columns) {
handles.add(checkType(handle, JdbcColumnHandle.class, "columnHandle"));
}
return new JdbcRecordSet(jdbcClient, jdbcSplit, handles.build());
}
}

@ -0,0 +1,205 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;
import com.facebook.presto.spi.RecordSink;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.joda.time.DateTimeZone;
import org.joda.time.chrono.ISOChronology;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
public class JdbcRecordSink
implements RecordSink
{
private final Connection connection;
private final PreparedStatement statement;
private final int fieldCount;
private final List<Type> columnTypes;
private int field = -1;
private int batchSize;
public JdbcRecordSink(JdbcOutputTableHandle handle, JdbcClient jdbcClient)
{
try {
connection = jdbcClient.getConnection(handle);
connection.setAutoCommit(false);
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
try {
statement = connection.prepareStatement(jdbcClient.buildInsertSql(handle));
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
fieldCount = handle.getColumnNames().size();
columnTypes = handle.getColumnTypes();
}
@Override
public void beginRecord(long sampleWeight)
{
checkState(field == -1, "already in record");
field = 0;
}
@Override
public void finishRecord()
{
checkState(field != -1, "not in record");
checkState(field == fieldCount, "not all fields set");
field = -1;
try {
statement.addBatch();
batchSize++;
if (batchSize >= 1000) {
statement.executeBatch();
connection.commit();
connection.setAutoCommit(false);
batchSize = 0;
}
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public void appendNull()
{
try {
statement.setObject(next(), null);
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public void appendBoolean(boolean value)
{
try {
statement.setBoolean(next(), value);
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public void appendLong(long value)
{
try {
if (DATE.equals(columnTypes.get(field))) {
// convert to midnight in default time zone
long utcMillis = TimeUnit.DAYS.toMillis(value);
long localMillis = ISOChronology.getInstanceUTC().getZone().getMillisKeepLocal(DateTimeZone.getDefault(), utcMillis);
statement.setDate(next(), new Date(localMillis));
}
else {
statement.setLong(next(), value);
}
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public void appendDouble(double value)
{
try {
statement.setDouble(next(), value);
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public void appendString(byte[] value)
{
try {
statement.setString(next(), new String(value, UTF_8));
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public Collection<Slice> commit()
{
// commit and close
try (Connection connection = this.connection) {
if (batchSize > 0) {
statement.executeBatch();
connection.commit();
}
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
// the committer does not need any additional info
return ImmutableList.of();
}
@SuppressWarnings("UnusedDeclaration")
@Override
public void rollback()
{
// rollback and close
try (Connection connection = this.connection;
PreparedStatement statement = this.statement) {
connection.rollback();
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}
@Override
public List<Type> getColumnTypes()
{
return columnTypes;
}
private int next()
{
checkState(field != -1, "not in record");
checkState(field < fieldCount, "all fields already set");
field++;
return field;
}
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save